You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/06 21:50:01 UTC

[jira] [Commented] (KAFKA-6576) Configurable Quota Management (KIP-257)

    [ https://issues.apache.org/jira/browse/KAFKA-6576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16429058#comment-16429058 ] 

ASF GitHub Bot commented on KAFKA-6576:
---------------------------------------

rajinisivaram closed pull request #4699: KAFKA-6576: Configurable Quota Management (KIP-257)
URL: https://github.com/apache/kafka/pull/4699
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
new file mode 100644
index 00000000000..210e9f45840
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.util.Map;
+
+/**
+ * Quota callback interface for brokers that enables customization of client quota computation.
+ */
+public interface ClientQuotaCallback extends Configurable {
+
+    /**
+     * Quota callback invoked to determine the quota metric tags to be applied for a request.
+     * Quota limits are associated with quota metrics and all clients which use the same
+     * metric tags share the quota limit.
+     *
+     * @param quotaType Type of quota requested
+     * @param principal The user principal of the connection for which quota is requested
+     * @param clientId  The client id associated with the request
+     * @return quota metric tags that indicate which other clients share this quota
+     */
+    Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId);
+
+    /**
+     * Returns the quota limit associated with the provided metric tags. These tags were returned from
+     * a previous call to {@link #quotaMetricTags(ClientQuotaType, KafkaPrincipal, String)}. This method is
+     * invoked by quota managers to obtain the current quota limit applied to a metric when the first request
+     * using these tags is processed. It is also invoked after a quota update or cluster metadata change.
+     * If the tags are no longer in use after the update, (e.g. this is a {user, client-id} quota metric
+     * and the quota now in use is a {user} quota), null is returned.
+     *
+     * @param quotaType  Type of quota requested
+     * @param metricTags Metric tags for a quota metric of type `quotaType`
+     * @return the quota limit for the provided metric tags or null if the metric tags are no longer in use
+     */
+    Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags);
+
+    /**
+     * Quota configuration update callback that is invoked when quota configuration for an entity is
+     * updated in ZooKeeper. This is useful to track configured quotas if built-in quota configuration
+     * tools are used for quota management.
+     *
+     * @param quotaType   Type of quota being updated
+     * @param quotaEntity The quota entity for which quota is being updated
+     * @param newValue    The new quota value
+     */
+    void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue);
+
+    /**
+     * Quota configuration removal callback that is invoked when quota configuration for an entity is
+     * removed in ZooKeeper. This is useful to track configured quotas if built-in quota configuration
+     * tools are used for quota management.
+     *
+     * @param quotaType   Type of quota being updated
+     * @param quotaEntity The quota entity for which quota is being updated
+     */
+    void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity);
+
+    /**
+     * Returns true if any of the existing quota configs may have been updated since the last call
+     * to this method for the provided quota type. Quota updates as a result of calls to
+     * {@link #updateClusterMetadata(Cluster)}, {@link #updateQuota(ClientQuotaType, ClientQuotaEntity, double)}
+     * and {@link #removeQuota(ClientQuotaType, ClientQuotaEntity)} are automatically processed.
+     * So callbacks that rely only on built-in quota configuration tools always return false. Quota callbacks
+     * with external quota configuration or custom reconfigurable quota configs that affect quota limits must
+     * return true if existing metric configs may need to be updated. This method is invoked on every request
+     * and hence is expected to be handled by callbacks as a simple flag that is updated when quotas change.
+     *
+     * @param quotaType Type of quota
+     */
+    boolean quotaResetRequired(ClientQuotaType quotaType);
+
+    /**
+     * Metadata update callback that is invoked whenever UpdateMetadata request is received from
+     * the controller. This is useful if quota computation takes partitions into account.
+     * Topics that are being deleted will not be included in `cluster`.
+     *
+     * @param cluster Cluster metadata including partitions and their leaders if known
+     * @return true if quotas have changed and metric configs may need to be updated
+     */
+    boolean updateClusterMetadata(Cluster cluster);
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}
+
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
new file mode 100644
index 00000000000..a5ff082dfef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import java.util.List;
+
+/**
+ * The metadata for an entity for which quota is configured. Quotas may be defined at
+ * different levels and `configEntities` gives the list of config entities that define
+ * the level of this quota entity.
+ */
+public interface ClientQuotaEntity {
+
+    /**
+     * Entity type of a {@link ConfigEntity}
+     */
+    public enum ConfigEntityType {
+        USER,
+        CLIENT_ID,
+        DEFAULT_USER,
+        DEFAULT_CLIENT_ID
+    }
+
+    /**
+     * Interface representing a quota configuration entity. Quota may be
+     * configured at levels that include one or more configuration entities.
+     * For example, {user, client-id} quota is represented using two
+     * instances of ConfigEntity with entity types USER and CLIENT_ID.
+     */
+    public interface ConfigEntity {
+        /**
+         * Returns the name of this entity. For default quotas, an empty string is returned.
+         */
+        String name();
+
+        /**
+         * Returns the type of this entity.
+         */
+        ConfigEntityType entityType();
+    }
+
+    /**
+     * Returns the list of configuration entities that this quota entity is comprised of.
+     * For {user} or {clientId} quota, this is a single entity and for {user, clientId}
+     * quota, this is a list of two entities.
+     */
+    List<ConfigEntity> configEntities();
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java
new file mode 100644
index 00000000000..4dd67d3125d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+/**
+ * Types of quotas that may be configured on brokers for client requests.
+ */
+public enum ClientQuotaType {
+    PRODUCE,
+    FETCH,
+    REQUEST
+}
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 8ec27a3a4c0..0f8690fd8b9 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,24 +16,29 @@
  */
 package kafka.server
 
+import java.{lang, util}
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import kafka.network.RequestChannel.Session
+import kafka.server.ClientQuotaManager._
 import kafka.utils.{Logging, ShutdownableThread}
-import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
 
 import scala.collection.JavaConverters._
 
 /**
  * Represents the sensors aggregated per client
- * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
+ * @param metricTags Quota metric tags for the client
  * @param quotaSensor @Sensor that tracks the quota
  * @param throttleTimeSensor @Sensor that tracks the throttle time
  */
-case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor)
 
 /**
  * Configuration settings for quota management
@@ -61,9 +66,6 @@ object ClientQuotaManagerConfig {
   val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
 
   val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
-  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None, None)
-  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
 }
 
 object QuotaTypes {
@@ -71,11 +73,60 @@ object QuotaTypes {
   val ClientIdQuotaEnabled = 1
   val UserQuotaEnabled = 2
   val UserClientIdQuotaEnabled = 4
+  val CustomQuotas = 8 // No metric update optimizations are used with custom quotas
 }
 
-case class QuotaId(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String])
+object ClientQuotaManager {
+  val DefaultClientIdQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
+  val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
+  val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
 
-case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, sanitizedClientId: String, quota: Quota)
+  case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER
+    override def name: String = Sanitizer.desanitize(sanitizedUser)
+    override def toString: String = s"user $sanitizedUser"
+  }
+
+  case class ClientIdEntity(clientId: String) extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.CLIENT_ID
+    override def name: String = clientId
+    override def toString: String = s"client-id $clientId"
+  }
+
+  case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
+    override def name: String = ConfigEntityName.Default
+    override def toString: String = "default user"
+  }
+
+  case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
+    override def name: String = ConfigEntityName.Default
+    override def toString: String = "default client-id"
+  }
+
+  case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity],
+                              clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity {
+    override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] =
+      (userEntity.toList ++ clientIdEntity.toList).asJava
+    def sanitizedUser: String = userEntity.map {
+      case entity: UserEntity => entity.sanitizedUser
+      case DefaultUserEntity => ConfigEntityName.Default
+    }.getOrElse("")
+    def clientId: String = clientIdEntity.map(_.name).getOrElse("")
+
+    override def toString: String = {
+      val user = userEntity.map(_.toString).getOrElse("")
+      val clientId = clientIdEntity.map(_.toString).getOrElse("")
+      s"$user $clientId".trim
+    }
+  }
+
+  object DefaultTags {
+    val User = "user"
+    val ClientId = "client-id"
+  }
+}
 
 /**
  * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
@@ -107,21 +158,26 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val quotaType: QuotaType,
                          private val time: Time,
-                         threadNamePrefix: String) extends Logging {
-  private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
+                         threadNamePrefix: String,
+                         clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging {
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
-  @volatile private var quotaTypesEnabled =
-    if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
-    else QuotaTypes.ClientIdQuotaEnabled
+  private val clientQuotaType = quotaTypeToClientQuotaType(quotaType)
+  @volatile private var quotaTypesEnabled = clientQuotaCallback match {
+    case Some(_) => QuotaTypes.CustomQuotas
+    case None =>
+      if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
+      else QuotaTypes.ClientIdQuotaEnabled
+  }
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess(lock, metrics)
   private[server] val throttledRequestReaper = new ThrottledRequestReaper(delayQueue, threadNamePrefix)
+  private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
 
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
-                                      quotaType.toString,
-                                      "Tracks the size of the delay queue"), new Total())
+    quotaType.toString,
+    "Tracks the size of the delay queue"), new Total())
   start() // Use start method to keep findbugs happy
   private def start() {
     throttledRequestReaper.start()
@@ -132,7 +188,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * @param delayQueue DelayQueue to dequeue from
    */
   class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse], prefix: String) extends ShutdownableThread(
-    s"${prefix}ThrottledRequestReaper-${quotaType}", false) {
+    s"${prefix}ThrottledRequestReaper-$quotaType", false) {
 
     override def doWork(): Unit = {
       val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -158,17 +214,18 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
    * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
    * Throttle time calculation may be overridden by sub-classes.
-   * @param sanitizedUser user principal of client
+   *
+   * @param session  the session associated with this request
    * @param clientId clientId that produced/fetched the data
-   * @param value amount of data in bytes or request processing time as a percentage
+   * @param value    amount of data in bytes or request processing time as a percentage
    * @param callback Callback function. This will be triggered immediately if quota is not violated.
    *                 If there is a quota violation, this callback will be triggered after a delay
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+  def maybeRecordAndThrottle(session: Session, clientId: String, value: Double, callback: Int => Unit): Int = {
     if (quotasEnabled) {
-      val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
       recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
     } else {
       // Don't record any metrics if quotas are not enabled at any level
@@ -187,9 +244,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     } catch {
       case _: QuotaViolationException =>
         // Compute the delay
-        val clientQuotaEntity = clientSensors.quotaEntity
-        val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
+        val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
+        throttleTimeMs = throttleTime(clientMetric).toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -209,126 +265,27 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Determines the quota-id for the client with the specified user principal
-   * and client-id and returns the quota entity that encapsulates the quota-id
-   * and the associated quota override or default quota.
+   * Returns the quota for the client with the specified (non-encoded) user principal and client-id.
    *
+   * Note: this method is expensive, it is meant to be used by tests only
    */
-  private def quotaEntity(sanitizedUser: String, clientId: String, sanitizedClientId: String) : QuotaEntity = {
-    quotaTypesEnabled match {
-      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
-        val quotaId = QuotaId(None, Some(clientId), Some(sanitizedClientId))
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
-          if (quota == null)
-            quota = staticConfigClientIdQuota
-        }
-        QuotaEntity(quotaId, "", clientId, sanitizedClientId, quota)
-      case QuotaTypes.UserQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), None, None)
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
-          if (quota == null)
-            quota = ClientQuotaManagerConfig.UnlimitedQuota
-        }
-        QuotaEntity(quotaId, sanitizedUser, "", "", quota)
-      case QuotaTypes.UserClientIdQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizedClientId))
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)))
-          if (quota == null) {
-            quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizedClientId)))
-            if (quota == null) {
-              quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
-              if (quota == null)
-                quota = ClientQuotaManagerConfig.UnlimitedQuota
-            }
-          }
-        }
-        QuotaEntity(quotaId, sanitizedUser, clientId, sanitizedClientId, quota)
-      case _ =>
-        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId, sanitizedClientId)
-    }
-  }
-
-  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String, sanitizerClientId: String) : QuotaEntity = {
-    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizerClientId))
-
-    val userQuotaId = QuotaId(Some(sanitizedUser), None, None)
-    val clientQuotaId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
-    var quotaId = userClientQuotaId
-    var quotaConfigId = userClientQuotaId
-    // 1) /config/users/<user>/clients/<client-id>
-    var quota = overriddenQuota.get(quotaConfigId)
-    if (quota == null) {
-      // 2) /config/users/<user>/clients/<default>
-      quotaId = userClientQuotaId
-      quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-      quota = overriddenQuota.get(quotaConfigId)
-
-      if (quota == null) {
-        // 3) /config/users/<user>
-        quotaId = userQuotaId
-        quotaConfigId = quotaId
-        quota = overriddenQuota.get(quotaConfigId)
-
-        if (quota == null) {
-          // 4) /config/users/<default>/clients/<client-id>
-          quotaId = userClientQuotaId
-          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizerClientId))
-          quota = overriddenQuota.get(quotaConfigId)
-
-          if (quota == null) {
-            // 5) /config/users/<default>/clients/<default>
-            quotaId = userClientQuotaId
-            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-            quota = overriddenQuota.get(quotaConfigId)
-
-            if (quota == null) {
-              // 6) /config/users/<default>
-              quotaId = userQuotaId
-              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None, None)
-              quota = overriddenQuota.get(quotaConfigId)
-
-              if (quota == null) {
-                // 7) /config/clients/<client-id>
-                quotaId = clientQuotaId
-                quotaConfigId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
-                quota = overriddenQuota.get(quotaConfigId)
-
-                if (quota == null) {
-                  // 8) /config/clients/<default>
-                  quotaId = clientQuotaId
-                  quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-                  quota = overriddenQuota.get(quotaConfigId)
-
-                  if (quota == null) {
-                    quotaId = clientQuotaId
-                    quotaConfigId = null
-                    quota = staticConfigClientIdQuota
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-    val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
-    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
-    QuotaEntity(quotaId, quotaUser, quotaClientId, sanitizerClientId, quota)
+  def quota(user: String, clientId: String): Quota = {
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+    quota(userPrincipal, clientId)
   }
 
   /**
-   * Returns the quota for the client with the specified (non-encoded) user principal and client-id.
-   * 
+   * Returns the quota for the client with the specified user principal and client-id.
+   *
    * Note: this method is expensive, it is meant to be used by tests only
    */
-  def quota(user: String, clientId: String) = {
-    quotaEntity(Sanitizer.sanitize(user), clientId, Sanitizer.sanitize(clientId)).quota
+  def quota(userPrincipal: KafkaPrincipal, clientId: String): Quota = {
+    val metricTags = quotaCallback.quotaMetricTags(clientQuotaType, userPrincipal, clientId)
+    Quota.upperBound(quotaLimit(metricTags))
+  }
+
+  private def quotaLimit(metricTags: util.Map[String, String]): Double = {
+    Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble)getOrElse(Long.MaxValue)
   }
 
   /*
@@ -339,10 +296,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+  protected def throttleTime(clientMetric: KafkaMetric): Long = {
+    val config = clientMetric.config
     val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
     val quota = config.quota()
-    val difference = clientMetric.value() - quota.bound
+    val difference = clientMetric.metricValue.asInstanceOf[Double] - quota.bound
     // Use the precise window used by the rate calculation
     val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
     throttleTimeMs.round
@@ -360,56 +318,72 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * This function either returns the sensors for a given client id or creates them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
-  def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
-    val sanitizedClientId = Sanitizer.sanitize(clientId)
-    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId, sanitizedClientId)
+  def getOrCreateQuotaSensors(session: Session, clientId: String): ClientSensors = {
+    // Use cached sanitized principal if using default callback
+    val metricTags = quotaCallback match {
+      case callback: DefaultQuotaCallback => callback.quotaMetricTags(session.sanitizedUser, clientId)
+      case _ => quotaCallback.quotaMetricTags(clientQuotaType, session.principal, clientId).asScala.toMap
+    }
     // Names of the sensors to access
-    ClientSensors(
-      clientQuotaEntity,
+    val sensors = ClientSensors(
+      metricTags,
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(clientQuotaEntity.quotaId),
+        getQuotaSensorName(metricTags),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
-        Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
+        clientRateMetricName(metricTags),
+        Some(getQuotaMetricConfig(metricTags)),
         new Rate
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(metricTags),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        throttleMetricName(clientQuotaEntity),
+        throttleMetricName(metricTags),
         None,
         new Avg
       )
     )
+    if (quotaCallback.quotaResetRequired(clientQuotaType))
+      updateQuotaMetricConfigs()
+    sensors
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def metricTagsToSensorSuffix(metricTags: Map[String, String]): String =
+    metricTags.values.mkString(":")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def getThrottleTimeSensorName(metricTags: Map[String, String]): String =
+    s"${quotaType}ThrottleTime-${metricTagsToSensorSuffix(metricTags)}"
 
-  protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+  private def getQuotaSensorName(metricTags: Map[String, String]): String =
+    s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"
+
+  private def getQuotaMetricConfig(metricTags: Map[String, String]): MetricConfig = {
+    getQuotaMetricConfig(quotaLimit(metricTags.asJava))
+  }
+
+  private def getQuotaMetricConfig(quotaLimit: Double): MetricConfig = {
     new MetricConfig()
-            .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-            .samples(config.numQuotaSamples)
-            .quota(quota)
+      .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
   }
 
   protected def getOrCreateSensor(sensorName: String, metricName: MetricName): Sensor = {
     sensorAccessor.getOrCreate(
-        sensorName,
-        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        metricName,
-        None,
-        new Rate
-      )
+      sensorName,
+      ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+      metricName,
+      None,
+      new Rate
+    )
   }
 
   /**
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
    * for any of these levels.
-   * @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
-   * @param clientId client to override if quota applies to <client-id> or <user, client-id>
+   *
+   * @param sanitizedUser     user to override if quota applies to <user> or <user, client-id>
+   * @param clientId          client to override if quota applies to <client-id> or <user, client-id>
    * @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
-   * @param quota custom quota to apply or None if quota override is being removed
+   * @param quota             custom quota to apply or None if quota override is being removed
    */
   def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) {
     /*
@@ -421,86 +395,233 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      val quotaId = QuotaId(sanitizedUser, clientId, sanitizedClientId)
-      val userInfo = sanitizedUser match {
-        case Some(ConfigEntityName.Default) => "default user "
-        case Some(user) => "user " + user + " "
-        case None => ""
+      val userEntity = sanitizedUser.map {
+        case ConfigEntityName.Default => DefaultUserEntity
+        case user => UserEntity(user)
       }
-      val clientIdInfo = clientId match {
-        case Some(ConfigEntityName.Default) => "default client-id"
-        case Some(id) => "client-id " + id
-        case None => ""
+      val clientIdEntity = sanitizedClientId.map {
+        case ConfigEntityName.Default => DefaultClientIdEntity
+        case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
       }
+      val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
+
+      if (userEntity.nonEmpty) {
+        if (quotaEntity.clientIdEntity.nonEmpty)
+          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+        else
+          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+      } else if (clientIdEntity.nonEmpty)
+        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+
       quota match {
-        case Some(newQuota) =>
-          info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
-          overriddenQuota.put(quotaId, newQuota)
-          (sanitizedUser, clientId) match {
-            case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-            case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-            case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-            case (None, None) =>
-          }
-        case None =>
-          info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
-          overriddenQuota.remove(quotaId)
+        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
+        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
       }
+      val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientIdEntity.contains(DefaultClientIdEntity))
+        None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
+      else
+        Some(quotaEntity)
+      updateQuotaMetricConfigs(updatedEntity)
 
-      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse(""))
-      val allMetrics = metrics.metrics()
+    } finally {
+      lock.writeLock().unlock()
+    }
+  }
 
-      // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics
-      // to find all affected values. Otherwise, update just the single matching one.
-      val singleUpdate = quotaTypesEnabled match {
-        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
-          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined
-        case _ => false
+  /**
+   * Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
+   * or when partitions leaders change and custom callbacks that implement partition-based quotas
+   * have updated quotas.
+   * @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
+   *    level, then an optimized update is performed with a single metric update. If None is provided,
+   *    or if custom callbacks are used or if multi-level quotas have been enabled, all metric configs
+   *    are checked and updated if required.
+   */
+  def updateQuotaMetricConfigs(updatedQuotaEntity: Option[KafkaQuotaEntity] = None): Unit = {
+    val allMetrics = metrics.metrics()
+
+    // If using custom quota callbacks or if multiple-levels of quotas are defined or
+    // if this is a default quota update, traverse metrics to find all affected values.
+    // Otherwise, update just the single matching one.
+    val singleUpdate = quotaTypesEnabled match {
+      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
+        updatedQuotaEntity.nonEmpty
+      case _ => false
+    }
+    if (singleUpdate) {
+      val quotaEntity = updatedQuotaEntity.getOrElse(throw new IllegalStateException("Quota entity not specified"))
+      val user = quotaEntity.sanitizedUser
+      val clientId = quotaEntity.clientId
+      val metricTags = Map(DefaultTags.User -> user, DefaultTags.ClientId -> clientId)
+
+      val quotaMetricName = clientRateMetricName(metricTags)
+      // Change the underlying metric config if the sensor has been created
+      val metric = allMetrics.get(quotaMetricName)
+      if (metric != null) {
+        Option(quotaCallback.quotaLimit(clientQuotaType, metricTags.asJava)).foreach { newQuota =>
+          info(s"Sensor for $quotaEntity already exists. Changing quota to $newQuota in MetricConfig")
+          metric.config(getQuotaMetricConfig(newQuota))
+        }
       }
-      if (singleUpdate) {
-          // Change the underlying metric config if the sensor has been created
-          val metric = allMetrics.get(quotaMetricName)
-          if (metric != null) {
-            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
-            val newQuota = metricConfigEntity.quota
-            info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
-            metric.config(getQuotaMetricConfig(newQuota))
-          }
-      } else {
-          allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
-            case (metricName, metric) =>
-              val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
-              val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""
-              val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag))
-              if (metricConfigEntity.quota != metric.config.quota) {
-                val newQuota = metricConfigEntity.quota
-                info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
-                metric.config(getQuotaMetricConfig(newQuota))
-              }
+    } else {
+      val quotaMetricName = clientRateMetricName(Map.empty)
+      allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
+        case (metricName, metric) =>
+          val metricTags = metricName.tags
+          Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).foreach { quota =>
+            val newQuota = quota.asInstanceOf[Double]
+            if (newQuota != metric.config.quota.bound) {
+              info(s"Sensor for quota-id $metricTags already exists. Setting quota to $newQuota in MetricConfig")
+              metric.config(getQuotaMetricConfig(newQuota))
+            }
           }
       }
-
-    } finally {
-      lock.writeLock().unlock()
     }
   }
 
-  protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+  protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("byte-rate", quotaType.toString,
-                   "Tracking byte-rate per user/client-id",
-                   "user", sanitizedUser,
-                   "client-id", clientId)
+      "Tracking byte-rate per user/client-id",
+      quotaMetricTags.asJava)
   }
 
-  private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
+  private def throttleMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("throttle-time",
-                       quotaType.toString,
-                       "Tracking average throttle-time per user/client-id",
-                       "user", quotaEntity.sanitizedUser,
-                       "client-id", quotaEntity.clientId)
+      quotaType.toString,
+      "Tracking average throttle-time per user/client-id",
+      quotaMetricTags.asJava)
   }
 
-  def shutdown() = {
+  private def quotaTypeToClientQuotaType(quotaType: QuotaType): ClientQuotaType = {
+    quotaType match {
+      case QuotaType.Fetch => ClientQuotaType.FETCH
+      case QuotaType.Produce => ClientQuotaType.PRODUCE
+      case QuotaType.Request => ClientQuotaType.REQUEST
+      case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType")
+    }
+  }
+
+  def shutdown(): Unit = {
     throttledRequestReaper.shutdown()
   }
+
+  class DefaultQuotaCallback extends ClientQuotaCallback {
+    private val overriddenQuotas = new ConcurrentHashMap[ClientQuotaEntity, Quota]()
+
+    override def configure(configs: util.Map[String, _]): Unit = {}
+
+    override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
+      quotaMetricTags(Sanitizer.sanitize(principal.getName), clientId).asJava
+    }
+
+    override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = {
+      val sanitizedUser = metricTags.get(DefaultTags.User)
+      val clientId = metricTags.get(DefaultTags.ClientId)
+      var quota: Quota = null
+
+      if (sanitizedUser != null && clientId != null) {
+        val userEntity = Some(UserEntity(sanitizedUser))
+        val clientIdEntity = Some(ClientIdEntity(clientId))
+        if (!sanitizedUser.isEmpty && !clientId.isEmpty) {
+          // /config/users/<user>/clients/<client-id>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, clientIdEntity))
+          if (quota == null) {
+            // /config/users/<user>/clients/<default>
+            quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, Some(DefaultClientIdEntity)))
+          }
+          if (quota == null) {
+            // /config/users/<default>/clients/<client-id>
+            quota = overriddenQuotas.get(KafkaQuotaEntity(Some(DefaultUserEntity), clientIdEntity))
+          }
+          if (quota == null) {
+            // /config/users/<default>/clients/<default>
+            quota = overriddenQuotas.get(DefaultUserClientIdQuotaEntity)
+          }
+        } else if (!sanitizedUser.isEmpty) {
+          // /config/users/<user>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, None))
+          if (quota == null) {
+            // /config/users/<default>
+            quota = overriddenQuotas.get(DefaultUserQuotaEntity)
+          }
+        } else if (!clientId.isEmpty) {
+          // /config/clients/<client-id>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(None, clientIdEntity))
+          if (quota == null) {
+            // /config/clients/<default>
+            quota = overriddenQuotas.get(DefaultClientIdQuotaEntity)
+          }
+          if (quota == null)
+            quota = staticConfigClientIdQuota
+        }
+      }
+      if (quota == null) null else quota.bound
+    }
+
+    override def updateClusterMetadata(cluster: Cluster): Boolean = {
+      // Default quota callback does not use any cluster metadata
+      false
+    }
+
+    override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = {
+      val quotaEntity = entity.asInstanceOf[KafkaQuotaEntity]
+      info(s"Changing $quotaType quota for $quotaEntity to $newValue")
+      overriddenQuotas.put(quotaEntity, new Quota(newValue, true))
+    }
+
+    override def removeQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity): Unit = {
+      val quotaEntity = entity.asInstanceOf[KafkaQuotaEntity]
+      info(s"Removing $quotaType quota for $quotaEntity")
+      overriddenQuotas.remove(quotaEntity)
+    }
+
+    override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = false
+
+    def quotaMetricTags(sanitizedUser: String, clientId: String) : Map[String, String] = {
+      val (userTag, clientIdTag) = quotaTypesEnabled match {
+        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
+          ("", clientId)
+        case QuotaTypes.UserQuotaEnabled =>
+          (sanitizedUser, "")
+        case QuotaTypes.UserClientIdQuotaEnabled =>
+          (sanitizedUser, clientId)
+        case _ =>
+          val userEntity = Some(UserEntity(sanitizedUser))
+          val clientIdEntity = Some(ClientIdEntity(clientId))
+
+          var metricTags = (sanitizedUser, clientId)
+          // 1) /config/users/<user>/clients/<client-id>
+          if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, clientIdEntity))) {
+            // 2) /config/users/<user>/clients/<default>
+            metricTags = (sanitizedUser, clientId)
+            if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, Some(DefaultClientIdEntity)))) {
+              // 3) /config/users/<user>
+              metricTags = (sanitizedUser, "")
+              if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, None))) {
+                // 4) /config/users/<default>/clients/<client-id>
+                metricTags = (sanitizedUser, clientId)
+                if (!overriddenQuotas.containsKey(KafkaQuotaEntity(Some(DefaultUserEntity), clientIdEntity))) {
+                  // 5) /config/users/<default>/clients/<default>
+                  metricTags = (sanitizedUser, clientId)
+                  if (!overriddenQuotas.containsKey(DefaultUserClientIdQuotaEntity)) {
+                    // 6) /config/users/<default>
+                    metricTags = (sanitizedUser, "")
+                    if (!overriddenQuotas.containsKey(DefaultUserQuotaEntity)) {
+                      // 7) /config/clients/<client-id>
+                      // 8) /config/clients/<default>
+                      // 9) static client-id quota
+                      metricTags = ("", clientId)
+                    }
+                  }
+                }
+              }
+            }
+          }
+          metricTags
+      }
+      Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag)
+    }
+
+    override def close(): Unit = {}
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 59fa4218acb..3078a62175e 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -22,13 +22,17 @@ import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.collection.JavaConverters._
 
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
                                 private val metrics: Metrics,
                                 private val time: Time,
-                                threadNamePrefix: String)
-                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix) {
+                                threadNamePrefix: String,
+                                quotaCallback: Option[ClientQuotaCallback])
+                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) {
   val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
   def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
 
@@ -43,7 +47,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
     }
 
     if (quotasEnabled) {
-      val quotaSensors = getOrCreateQuotaSensors(request.session.sanitizedUser, request.header.clientId)
+      val quotaSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
       request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
 
       recordAndThrottleOnQuotaViolation(
@@ -62,15 +66,14 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
-  override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
-    math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+  override protected def throttleTime(clientMetric: KafkaMetric): Long = {
+    math.min(super.throttleTime(clientMetric), maxThrottleTimeMs)
   }
 
-  override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+  override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("request-time", QuotaType.Request.toString,
-                   "Tracking request-time per user/client-id",
-                   "user", sanitizedUser,
-                   "client-id", clientId)
+      "Tracking request-time per user/client-id",
+      quotaMetricTags.asJava)
   }
 
   private def exemptMetricName: MetricName = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 766907a7de2..1839768ecd4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -75,13 +75,12 @@ object DynamicBrokerConfig {
 
   private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
 
-  val AllDynamicConfigs = mutable.Set[String]()
-  AllDynamicConfigs ++= DynamicSecurityConfigs
-  AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
-  AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
-  AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
-  AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
-  AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs
+  val AllDynamicConfigs = DynamicSecurityConfigs ++
+    LogCleaner.ReconfigurableConfigs ++
+    DynamicLogConfig.ReconfigurableConfigs ++
+    DynamicThreadPool.ReconfigurableConfigs ++
+    Set(KafkaConfig.MetricReporterClassesProp) ++
+    DynamicListenerConfig.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs
@@ -159,16 +158,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaConfig))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
-    require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
     reconfigurables += reconfigurable
   }
 
   def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
-    require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     brokerReconfigurables += reconfigurable
   }
 
@@ -176,6 +176,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     reconfigurables -= reconfigurable
   }
 
+  private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
+    val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
+    require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
+  }
+
   // Visibility for testing
   private[server] def currentKafkaConfig: KafkaConfig = CoreUtils.inReadLock(lock) {
     currentConfig
@@ -705,6 +710,36 @@ object DynamicListenerConfig {
   )
 }
 
+class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Reconfigurable {
+
+  override def configure(configs: util.Map[String, _]): Unit = {}
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    val configs = new util.HashSet[String]()
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
+      case _ =>
+    }
+    configs
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable => callback.validateReconfiguration(configs)
+      case _ =>
+    }
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable =>
+        config.dynamicConfig.maybeReconfigure(callback, config.dynamicConfig.currentKafkaConfig, configs)
+        true
+      case _ => false
+    }
+  }
+}
+
 class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging {
 
   override def reconfigurableConfigs: Set[String] = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e79afa2a5b..f43f8a5b4b6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,6 +231,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
+      config.quotaCallback.foreach { callback =>
+        if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
+          quotas.fetch.updateQuotaMetricConfigs()
+          quotas.produce.updateQuotaMetricConfigs()
+          quotas.request.updateQuotaMetricConfigs()
+        }
+      }
       sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
     } else {
       sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
@@ -445,7 +452,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       quotas.produce.maybeRecordAndThrottle(
-        request.session.sanitizedUser,
+        request.session,
         request.header.clientId,
         numBytesAppended,
         produceResponseCallback)
@@ -610,7 +617,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // This may be slightly different from the actual response size. But since down conversions
         // result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
         val responseStruct = unconvertedFetchResponse.toStruct(versionId)
-        quotas.fetch.maybeRecordAndThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
+        quotas.fetch.maybeRecordAndThrottle(request.session, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5a1dca395bb..096b918d5d9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -390,6 +391,7 @@ object KafkaConfig {
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
   val AlterLogDirsReplicationQuotaWindowSizeSecondsProp = "alter.log.dirs.replication.quota.window.size.seconds"
+  val ClientQuotaCallbackClassProp = "client.quota.callback.class"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -672,6 +674,10 @@ object KafkaConfig {
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
   val AlterLogDirsReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for alter log dirs replication quotas"
+  val ClientQuotaCallbackClassDoc = "The fully qualified name of a class that implements the ClientQuotaCallback interface, " +
+    "which is used to determine quota limits applied to client requests. By default, <user, client-id>, <user> or <client-id> " +
+    "quotas stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " +
+    "of the session and the client-id of the request is applied."
   /** ********* Transaction Configuration ***********/
   val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
     "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
@@ -913,6 +919,7 @@ object KafkaConfig {
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
       .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc)
+      .define(ClientQuotaCallbackClassProp, CLASS, null, LOW, ClientQuotaCallbackClassDoc)
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
@@ -1204,6 +1211,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
   val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
   val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
+  val quotaCallback = Option(getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, classOf[ClientQuotaCallback]))
 
   /** ********* Transaction Configuration **************/
   val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d7ca65658f6..437912569ff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -595,6 +595,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         if (quotaManagers != null)
           CoreUtils.swallow(quotaManagers.shutdown(), this)
+        config.quotaCallback.foreach(_.close())
+
         // Even though socket server is stopped much earlier, controller can generate
         // response for controlled shutdown request. Shutdown server at the end to
         // avoid any failures (e.g. when metrics are recorded)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 7cdb8f1a1a3..43fe35287d3 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util.Collections
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.{Seq, Set, mutable}
@@ -28,7 +29,7 @@ import kafka.controller.StateChangeLogger
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
@@ -127,6 +128,14 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def getAllPartitions(): Map[TopicPartition, UpdateMetadataRequest.PartitionState] = {
+    inReadLock(partitionMetadataLock) {
+      cache.flatMap { case (topic, partitionStates) =>
+        partitionStates.map { case (partition, state ) => (new TopicPartition(topic, partition), state) }
+      }.toMap
+    }
+  }
+
   def getNonExistingTopics(topics: Set[String]): Set[String] = {
     inReadLock(partitionMetadataLock) {
       topics -- cache.keySet
@@ -180,6 +189,27 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   def getControllerId: Option[Int] = controllerId
 
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    inReadLock(partitionMetadataLock) {
+      val nodes = aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) }
+      def node(id: Integer): Node = nodes.get(id).orNull
+      val partitions = getAllPartitions()
+        .filter { case (_, state) => state.basePartitionState.leader != LeaderAndIsr.LeaderDuringDelete }
+        .map { case (tp, state) =>
+          new PartitionInfo(tp.topic, tp.partition, node(state.basePartitionState.leader),
+            state.basePartitionState.replicas.asScala.map(node).toArray,
+            state.basePartitionState.isr.asScala.map(node).toArray,
+            state.offlineReplicas.asScala.map(node).toArray)
+        }
+      val unauthorizedTopics = Collections.emptySet[String]
+      val internalTopics = getAllTopics().filter(Topic.isInternal).asJava
+      new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
+        partitions.toList.asJava,
+        unauthorizedTopics, internalTopics,
+        getControllerId.map(id => node(id)).orNull)
+    }
+  }
+
   // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
   def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
     inWriteLock(partitionMetadataLock) {
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 01441b57323..c758b5a345d 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -54,9 +54,9 @@ object QuotaFactory extends Logging {
 
   def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
     QuotaManagers(
-      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix),
-      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix),
-      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix),
+      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, cfg.quotaCallback),
+      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, cfg.quotaCallback),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, cfg.quotaCallback),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
       new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9b1c2aad2f9..b265182af23 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -16,31 +16,29 @@ package kafka.api
 
 import java.util.{Collections, HashMap, Properties}
 
-import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId, QuotaType}
+import kafka.api.QuotaTestClients._
+import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
-abstract class BaseQuotaTest extends IntegrationTestHarness {
+import scala.collection.JavaConverters._
 
-  def userPrincipal : String
-  def producerQuotaId : QuotaId
-  def consumerQuotaId : QuotaId
-  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
-  def removeQuotaOverrides()
+abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   override val serverCount = 2
   val producerCount = 1
   val consumerCount = 1
 
-  private val producerBufferSize = 300000
   protected def producerClientId = "QuotasTestProducer-1"
   protected def consumerClientId = "QuotasTestConsumer-1"
+  protected def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients
 
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
@@ -49,7 +47,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
   this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
-  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "300000")
   this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -63,9 +61,10 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   val defaultConsumerQuota = 2500
   val defaultRequestQuota = Int.MaxValue
 
-  var leaderNode: KafkaServer = null
-  var followerNode: KafkaServer = null
-  private val topic1 = "topic-1"
+  val topic1 = "topic-1"
+  var leaderNode: KafkaServer = _
+  var followerNode: KafkaServer = _
+  var quotaTestClients: QuotaTestClients = _
 
   @Before
   override def setUp() {
@@ -75,22 +74,19 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     val leaders = createTopic(topic1, numPartitions, serverCount)
     leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
     followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
+    quotaTestClients = createQuotaTestClients(topic1, leaderNode)
   }
 
   @Test
   def testThrottledProducerConsumer() {
 
     val numRecords = 1000
-    val producer = producers.head
-    val produced = produceUntilThrottled(producer, numRecords)
-    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
-    verifyProducerThrottleTimeMetric(producer)
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    val consumer = consumers.head
-    consumeUntilThrottled(consumer, produced)
-    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
-    verifyConsumerThrottleTimeMetric(consumer)
+    quotaTestClients.consumeUntilThrottled(produced)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
   @Test
@@ -100,154 +96,187 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
-    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
+    quotaTestClients.verifyProduceThrottle(expectThrottle = false)
 
     // The "client" consumer does not get throttled.
-    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
   }
 
   @Test
   def testQuotaOverrideDelete() {
     // Override producer and consumer quotas to unlimited
-    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
-    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
-    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
+    quotaTestClients.verifyProduceThrottle(expectThrottle = false)
+    assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
 
     // Delete producer and consumer quota overrides. Consumer and producer should now be
     // throttled since broker defaults are very small
-    removeQuotaOverrides()
-    val produced = produceUntilThrottled(producers.head, numRecords)
-    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+    quotaTestClients.removeQuotaOverrides()
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Since producer may have been throttled after producing a couple of records,
     // consume from beginning till throttled
     consumers.head.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
-    consumeUntilThrottled(consumers.head, numRecords + produced)
-    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
+    quotaTestClients.consumeUntilThrottled(numRecords + produced)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
   @Test
   def testThrottledRequest() {
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
 
     val consumer = consumers.head
     consumer.subscribe(Collections.singleton(topic1))
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
-    while ((!throttled || exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
+    while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
       consumer.poll(100)
-      val throttleMetric = consumerRequestThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId)
+      throttled = throttleMetric != null && metricValue(throttleMetric) > 0
     }
 
     assertTrue("Should have been throttled", throttled)
-    verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
+    quotaTestClients.verifyConsumerClientThrottleTimeMetric(expectThrottle = true,
+      Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
+
+    val exemptMetric = quotaTestClients.exemptRequestMetric
+    assertNotNull("Exempt requests not recorded", exemptMetric)
+    assertTrue("Exempt requests not recorded", metricValue(exemptMetric) > 0)
+  }
+}
+
+object QuotaTestClients {
+  def metricValue(metric: Metric): Double = metric.metricValue().asInstanceOf[Double]
+}
+
+abstract class QuotaTestClients(topic: String,
+                                leaderNode: KafkaServer,
+                                producerClientId: String,
+                                consumerClientId: String,
+                                producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+
+  def userPrincipal : KafkaPrincipal
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
+  def removeQuotaOverrides()
+
+  def quotaMetricTags(clientId: String): Map[String, String]
 
-    assertNotNull("Exempt requests not recorded", exemptRequestMetric)
-    assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+  def quota(quotaManager: ClientQuotaManager, userPrincipal: KafkaPrincipal, clientId: String): Quota = {
+    quotaManager.quota(userPrincipal, clientId)
   }
 
-  def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
+  def produceUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = {
     var numProduced = 0
     var throttled = false
     do {
       val payload = numProduced.toString.getBytes
-      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
-             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, null, null, payload),
+        new ErrorLoggingCallback(topic, null, null, true))
       numProduced += 1
-      val throttleMetric = producerThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      do {
+        val metric = throttleMetric(QuotaType.Produce, producerClientId)
+        throttled = metric != null && metricValue(metric) > 0
+      } while (!future.isDone && (!throttled || waitForRequestCompletion))
     } while (numProduced < maxRecords && !throttled)
     numProduced
   }
 
-  def consumeUntilThrottled(consumer: KafkaConsumer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
-    consumer.subscribe(Collections.singleton(topic1))
+  def consumeUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = {
+    consumer.subscribe(Collections.singleton(topic))
     var numConsumed = 0
     var throttled = false
     do {
       numConsumed += consumer.poll(100).count
-      val throttleMetric = consumerThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
+      throttled = metric != null && metricValue(metric) > 0
     }  while (numConsumed < maxRecords && !throttled)
 
     // If throttled, wait for the records from the last fetch to be received
-    if (throttled && numConsumed < maxRecords) {
+    if (throttled && numConsumed < maxRecords && waitForRequestCompletion) {
       val minRecords = numConsumed + 1
       while (numConsumed < minRecords)
-          numConsumed += consumer.poll(100).count
+        numConsumed += consumer.poll(100).count
     }
     numConsumed
   }
 
-  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    TestUtils.retry(10000) {
-      val quotaManagers = leaderNode.apis.quotas
-      val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
-      val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
-      val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
-      val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
+  def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = {
+    verifyThrottleTimeMetric(QuotaType.Produce, producerClientId, expectThrottle)
+    if (verifyClientMetric)
+      verifyProducerClientThrottleTimeMetric(expectThrottle)
+  }
 
-      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
-      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
-      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
-      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
+  def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = {
+    verifyThrottleTimeMetric(QuotaType.Fetch, consumerClientId, expectThrottle)
+    if (verifyClientMetric)
+      verifyConsumerClientThrottleTimeMetric(expectThrottle)
+  }
+
+  def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = {
+    val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
+    if (expectThrottle) {
+      assertTrue("Should have been throttled", throttleMetricValue > 0)
+    } else {
+      assertEquals("Should not have been throttled", 0.0, throttleMetricValue, 0.0)
     }
   }
 
-  private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
+  def throttleMetricName(quotaType: QuotaType, clientId: String): MetricName = {
+    leaderNode.metrics.metricName("throttle-time",
+      quotaType.toString,
+      quotaMetricTags(clientId).asJava)
+  }
+
+  def throttleMetric(quotaType: QuotaType, clientId: String): KafkaMetric = {
+    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, clientId))
+  }
+
+  def exemptRequestMetric: KafkaMetric = {
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+    leaderNode.metrics.metrics.get(metricName)
+  }
+
+  def verifyProducerClientThrottleTimeMetric(expectThrottle: Boolean) {
     val tags = new HashMap[String, String]
     tags.put("client-id", producerClientId)
     val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
     val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
 
-    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
-        s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+    if (expectThrottle) {
+      TestUtils.waitUntilTrue(() => metricValue(avgMetric) > 0.0 && metricValue(maxMetric) > 0.0,
+        s"Producer throttle metric not updated: avg=${metricValue(avgMetric)} max=${metricValue(maxMetric)}")
+    } else
+      assertEquals("Should not have been throttled", 0.0, metricValue(maxMetric), 0.0)
   }
 
-  private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) {
+  def verifyConsumerClientThrottleTimeMetric(expectThrottle: Boolean, maxThrottleTime: Option[Double] = None) {
     val tags = new HashMap[String, String]
     tags.put("client-id", consumerClientId)
     val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags))
     val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags))
 
-    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
-        s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
-    maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}", maxMetric.value <= max))
-  }
-
-  private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
-    leaderNode.metrics.metricName("throttle-time",
-                                  quotaType.toString,
-                                  "Tracking throttle-time per user/client-id",
-                                  "user", quotaId.sanitizedUser.getOrElse(""),
-                                  "client-id", quotaId.clientId.getOrElse(""))
-  }
-
-  def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
-    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
-  }
-
-  private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
-  private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
-  private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
-
-  private def exemptRequestMetric: KafkaMetric = {
-    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
-    leaderNode.metrics.metrics.get(metricName)
+    if (expectThrottle) {
+      TestUtils.waitUntilTrue(() => metricValue(avgMetric) > 0.0 && metricValue(maxMetric) > 0.0,
+        s"Consumer throttle metric not updated: avg=${metricValue(avgMetric)} max=${metricValue(maxMetric)}")
+      maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${metricValue(maxMetric)}",
+        metricValue(maxMetric) <= max))
+    } else
+      assertEquals("Should not have been throttled", 0.0, metricValue(maxMetric), 0.0)
   }
 
   def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
@@ -257,4 +286,19 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     props
   }
+
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) {
+    TestUtils.retry(10000) {
+      val quotaManagers = server.apis.quotas
+      val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
+      val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId)
+      val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId)
+      val overrideConsumerRequestQuota = quota(quotaManagers.request, userPrincipal, consumerClientId)
+
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
+    }
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 3e0832790b5..b084b3ca5ea 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,18 +16,15 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
+import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
 
 class ClientIdQuotaTest extends BaseQuotaTest {
 
-  override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override val producerQuotaId = QuotaId(None, Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
-  override val consumerQuotaId = QuotaId(None, Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -35,24 +32,35 @@ class ClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
     super.setUp()
   }
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val producerProps = new Properties()
-    producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
-    producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(producerClientId, producerProps)
-
-    val consumerProps = new Properties()
-    consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
-    consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(consumerClientId, consumerProps)
-  }
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    updateQuotaOverride(producerClientId, emptyProps)
-    updateQuotaOverride(consumerClientId, emptyProps)
-  }
 
-  private def updateQuotaOverride(clientId: String, properties: Properties) {
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override def userPrincipal: KafkaPrincipal = KafkaPrincipal.ANONYMOUS
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> "", "client-id" -> clientId)
+      }
+
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val producerProps = new Properties()
+        producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+        producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(producerClientId, producerProps)
+
+        val consumerProps = new Properties()
+        consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+        consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(consumerClientId, consumerProps)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        updateQuotaOverride(producerClientId, emptyProps)
+        updateQuotaOverride(consumerClientId, emptyProps)
+      }
+
+      private def updateQuotaOverride(clientId: String, properties: Properties) {
+        adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
new file mode 100644
index 00000000000..886d6962a6c
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -0,0 +1,453 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.io.File
+import java.{lang, util}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.{Collections, Properties}
+
+import kafka.api.GroupedUserPrincipalBuilder._
+import kafka.api.GroupedUserQuotaCallback._
+import kafka.server._
+import kafka.utils.JaasTestUtils.ScramLoginModule
+import kafka.utils.{JaasTestUtils, Logging, TestUtils}
+import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.{Cluster, Reconfigurable}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth._
+import org.apache.kafka.common.security.scram.ScramCredential
+import org.apache.kafka.server.quota._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
+
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected def listenerName = new ListenerName("CLIENT")
+  override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
+
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  override val consumerCount: Int = 0
+  override val producerCount: Int = 0
+  override val serverCount: Int = 2
+
+  private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  private val adminClients = new ArrayBuffer[AdminClient]()
+  private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+  val defaultRequestQuota = 1000
+  val defaultProduceQuota = 2000 * 1000 * 1000
+  val defaultConsumeQuota = 1000 * 1000 * 1000
+
+  @Before
+  override def setUp() {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
+    this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
+    this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName)
+    this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}",
+      classOf[GroupedUserPrincipalBuilder].getName)
+    this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+    super.setUp()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
+      ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
+    producerWithoutQuota = createNewProducer
+    producers += producerWithoutQuota
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    // Close producers and consumers without waiting for requests to complete
+    // to avoid waiting for throttled responses
+    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    producers.clear()
+    consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    consumers.clear()
+    super.tearDown()
+  }
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+  }
+
+  @Test
+  def testCustomQuotaCallback() {
+    // Large quota override, should not throttle
+    var brokerId = 0
+    var user = createGroupWithOneUser("group0_user1", brokerId)
+    user.configureAndWaitForQuota(1000000, 2000000)
+    quotaLimitCalls.values.foreach(_.set(0))
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once for each new client
+    assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get)
+    assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get)
+    assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get <= serverCount)
+    // Large quota updated to small quota, should throttle
+    user.configureAndWaitForQuota(9000, 3000)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Quota override deletion - verify default quota applied (large quota, no throttling)
+    user = addUser("group0_user2", brokerId)
+    user.removeQuotaOverrides()
+    user.waitForQuotaUpdate(defaultProduceQuota, defaultConsumeQuota, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Make default quota smaller, should throttle
+    user.configureAndWaitForQuota(8000, 2500, divisor = 1, group = None)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Configure large quota override, should not throttle
+    user = addUser("group0_user3", brokerId)
+    user.configureAndWaitForQuota(2000000, 2000000)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Quota large enough for one partition, should not throttle
+    brokerId = 1
+    user = createGroupWithOneUser("group1_user1", brokerId)
+    user.configureAndWaitForQuota(8000 * 100, 2500 * 100)
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Create large number of partitions on another broker, should result in throttling on first partition
+    val largeTopic = "group1_largeTopic"
+    createTopic(largeTopic, numPartitions = 99, leader = 0)
+    user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Remove quota override and test default quota applied with scaling based on partitions
+    user = addUser("group1_user2", brokerId)
+    user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+    user.configureAndWaitForQuota(8000 * 100, 2500 * 100, divisor=100, group = None)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Remove the second topic with large number of partitions, verify no longer throttled
+    adminZkClient.deleteTopic(largeTopic)
+    user = addUser("group1_user3", brokerId)
+    user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Alter configs of custom callback dynamically
+    val adminClient = createAdminClient()
+    val newProps = new Properties
+    newProps.put(GroupedUserQuotaCallback.DefaultProduceQuotaProp, "8000")
+    newProps.put(GroupedUserQuotaCallback.DefaultFetchQuotaProp, "2500")
+    TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig = false)
+    user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+  }
+
+  /**
+   * Creates a group with one user and one topic with one partition.
+   * @param firstUser First user to create in the group
+   * @param brokerId The broker id to use as leader of the partition
+   */
+  private def createGroupWithOneUser(firstUser: String, brokerId: Int): GroupedUser = {
+    val user = addUser(firstUser, brokerId)
+    createTopic(user.topic, numPartitions = 1, brokerId)
+    user.configureAndWaitForQuota(defaultProduceQuota, defaultConsumeQuota, divisor = 1, group = None)
+    user
+  }
+
+  private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
+    val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+  }
+
+  private def createAdminClient(): AdminClient = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+      TestUtils.bootstrapServers(servers, new ListenerName("BROKER")))
+    clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].asScala.foreach { case (key, value) =>
+      config.put(key.toString, value)
+    }
+    config.put(SaslConfigs.SASL_JAAS_CONFIG,
+      ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
+    val adminClient = AdminClient.create(config)
+    adminClients += adminClient
+    adminClient
+  }
+
+  private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = {
+    (0 until numRecords).foreach { i =>
+      val payload = i.toString.getBytes
+      producerWithoutQuota.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, null, null, payload))
+    }
+  }
+
+  private def addUser(user: String, leader: Int): GroupedUser = {
+
+    val password = s"$user:secret"
+    createScramCredentials(zkConnect, user, password)
+    servers.foreach { server =>
+      val cache = server.credentialProvider.credentialCache.cache(kafkaClientSaslMechanism, classOf[ScramCredential])
+      TestUtils.waitUntilTrue(() => cache.get(user) != null, "SCRAM credentials not created")
+    }
+
+    val userGroup = group(user)
+    val topic = s"${userGroup}_topic"
+    val producerClientId = s"$user:producer-client-id"
+    val consumerClientId = s"$user:producer-client-id"
+
+    producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
+    val producer = createNewProducer
+    producers += producer
+
+    consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
+    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
+    val consumer = createNewConsumer
+    consumers += consumer
+
+    GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, producer, consumer)
+  }
+
+  case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaServer,
+                         producerClientId: String, consumerClientId: String,
+                         producer: KafkaProducer[Array[Byte], Array[Byte]],
+                         consumer: KafkaConsumer[Array[Byte], Array[Byte]]) extends
+    QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
+
+    override def userPrincipal: KafkaPrincipal = GroupedUserPrincipal(user, userGroup)
+
+    override def quotaMetricTags(clientId: String): Map[String, String] = {
+      Map(GroupedUserQuotaCallback.QuotaGroupTag -> userGroup)
+    }
+
+    override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Unit = {
+      configureQuota(userGroup, producerQuota, consumerQuota, requestQuota)
+    }
+
+    override def removeQuotaOverrides(): Unit = {
+      adminZkClient.changeUserOrUserClientIdConfig(quotaEntityName(userGroup), new Properties)
+    }
+
+    def configureQuota(userGroup: String, producerQuota: Long, consumerQuota: Long, requestQuota: Double): Unit = {
+      val quotaProps = quotaProperties(producerQuota, consumerQuota, requestQuota)
+      adminZkClient.changeUserOrUserClientIdConfig(quotaEntityName(userGroup), quotaProps)
+    }
+
+    def configureAndWaitForQuota(produceQuota: Long, fetchQuota: Long, divisor: Int = 1,
+                                 group: Option[String] = Some(userGroup)): Unit = {
+      configureQuota(group.getOrElse(""), produceQuota, fetchQuota, defaultRequestQuota)
+      waitForQuotaUpdate(produceQuota / divisor, fetchQuota / divisor, defaultRequestQuota)
+    }
+
+    def produceConsume(expectProduceThrottle: Boolean, expectConsumeThrottle: Boolean): Unit = {
+      val numRecords = 1000
+      val produced = produceUntilThrottled(numRecords, waitForRequestCompletion = false)
+      verifyProduceThrottle(expectProduceThrottle, verifyClientMetric = false)
+      // make sure there are enough records on the topic to test consumer throttling
+      produceWithoutThrottle(topic, numRecords - produced)
+      consumeUntilThrottled(numRecords, waitForRequestCompletion = false)
+      verifyConsumeThrottle(expectConsumeThrottle, verifyClientMetric = false)
+    }
+
+    def removeThrottleMetrics(): Unit = {
+      def removeSensors(quotaType: QuotaType, clientId: String): Unit = {
+        val sensorSuffix = quotaMetricTags(clientId).values.mkString(":")
+        leaderNode.metrics.removeSensor(s"${quotaType}ThrottleTime-$sensorSuffix")
+        leaderNode.metrics.removeSensor(s"$quotaType-$sensorSuffix")
+      }
+      removeSensors(QuotaType.Produce, producerClientId)
+      removeSensors(QuotaType.Fetch, consumerClientId)
+      removeSensors(QuotaType.Request, producerClientId)
+      removeSensors(QuotaType.Request, consumerClientId)
+    }
+
+    private def quotaEntityName(userGroup: String): String = s"${userGroup}_"
+  }
+}
+
+object GroupedUserPrincipalBuilder {
+  def group(str: String): String = {
+    if (str.indexOf("_") <= 0)
+      ""
+    else
+      str.substring(0, str.indexOf("_"))
+  }
+}
+
+class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder {
+  override def build(context: AuthenticationContext): KafkaPrincipal = {
+    val securityProtocol = context.securityProtocol
+    if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
+      val user = context.asInstanceOf[SaslAuthenticationContext].server().getAuthorizationID
+      val userGroup = group(user)
+      if (userGroup.isEmpty)
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+      else
+        GroupedUserPrincipal(user, userGroup)
+    } else
+      throw new IllegalStateException(s"Unexpected security protocol $securityProtocol")
+  }
+}
+
+case class GroupedUserPrincipal(user: String, userGroup: String) extends KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+
+object GroupedUserQuotaCallback {
+  val QuotaGroupTag = "group"
+  val DefaultProduceQuotaProp = "default.produce.quota"
+  val DefaultFetchQuotaProp = "default.fetch.quota"
+  val UnlimitedQuotaMetricTags = Collections.emptyMap[String, String]
+  val quotaLimitCalls = Map(
+    ClientQuotaType.PRODUCE -> new AtomicInteger,
+    ClientQuotaType.FETCH -> new AtomicInteger,
+    ClientQuotaType.REQUEST -> new AtomicInteger
+  )
+}
+
+/**
+ * Quota callback for a grouped user. Both user principals and topics of each group
+ * are prefixed with the group name followed by '_'. This callback defines quotas of different
+ * types at the group level. Group quotas are configured in ZooKeeper as user quotas with
+ * the entity name "${group}_". Default group quotas are configured in ZooKeeper as user quotas
+ * with the entity name "_".
+ *
+ * Default group quotas may also be configured using the configuration options
+ * "default.produce.quota" and "default.fetch.quota" which can be reconfigured dynamically
+ * without restarting the broker. This tests custom reconfigurable options for quota callbacks,
+ */
+class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable with Logging {
+
+  var brokerId: Int = -1
+  val customQuotasUpdated = ClientQuotaType.values.toList
+    .map(quotaType =>(quotaType -> new AtomicBoolean)).toMap
+  val quotas = ClientQuotaType.values.toList
+    .map(quotaType => (quotaType -> new ConcurrentHashMap[String, Double])).toMap
+
+  val partitionRatio = new ConcurrentHashMap[String, Double]()
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    brokerId = configs.get(KafkaConfig.BrokerIdProp).toString.toInt
+  }
+
+  override def reconfigurableConfigs: util.Set[String] = {
+    Set(DefaultProduceQuotaProp, DefaultFetchQuotaProp).asJava
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    reconfigurableConfigs.asScala.foreach(configValue(configs, _))
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    configValue(configs, DefaultProduceQuotaProp).foreach(value => quotas(ClientQuotaType.PRODUCE).put("", value))
+    configValue(configs, DefaultFetchQuotaProp).foreach(value => quotas(ClientQuotaType.FETCH).put("", value))
+    customQuotasUpdated.values.foreach(_.set(true))
+  }
+
+  private def configValue(configs: util.Map[String, _], key: String): Option[Long] = {
+    val value = configs.get(key)
+    if (value != null) Some(value.toString.toLong) else None
+  }
+
+  override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
+    principal match {
+      case groupPrincipal: GroupedUserPrincipal =>
+        val userGroup = groupPrincipal.userGroup
+        val quotaLimit = quotaOrDefault(userGroup, quotaType)
+        if (quotaLimit != null)
+          Map(QuotaGroupTag -> userGroup).asJava
+        else
+          UnlimitedQuotaMetricTags
+      case _ =>
+        UnlimitedQuotaMetricTags
+    }
+  }
+
+  override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = {
+    quotaLimitCalls(quotaType).incrementAndGet
+    val group = metricTags.get(QuotaGroupTag)
+    if (group != null) quotaOrDefault(group, quotaType) else null
+  }
+
+  override def updateClusterMetadata(cluster: Cluster): Boolean = {
+    val topicsByGroup = cluster.topics.asScala.groupBy(group)
+
+    !topicsByGroup.forall { case (group, groupTopics) =>
+      val groupPartitions = groupTopics.flatMap(topic => cluster.partitionsForTopic(topic).asScala)
+      val totalPartitions = groupPartitions.size
+      val partitionsOnThisBroker = groupPartitions.count { p => p.leader != null && p.leader.id == brokerId }
+      val multiplier = if (totalPartitions == 0)
+        1
+      else if (partitionsOnThisBroker == 0)
+        1.0 / totalPartitions
+      else
+        partitionsOnThisBroker.toDouble / totalPartitions
+      partitionRatio.put(group, multiplier) != multiplier
+    }
+  }
+
+  override def updateQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity, newValue: Double): Unit = {
+    quotas(quotaType).put(userGroup(quotaEntity), newValue)
+  }
+
+  override def removeQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity): Unit = {
+    quotas(quotaType).remove(userGroup(quotaEntity))
+  }
+
+  override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = customQuotasUpdated(quotaType).getAndSet(false)
+
+  def close(): Unit = {}
+
+  private def userGroup(quotaEntity: ClientQuotaEntity): String = {
+    val configEntity = quotaEntity.configEntities.get(0)
+    if (configEntity.entityType == ClientQuotaEntity.ConfigEntityType.USER)
+      group(configEntity.name)
+    else
+      throw new IllegalArgumentException(s"Config entity type ${configEntity.entityType} is not supported")
+  }
+
+  private def quotaOrDefault(group: String, quotaType: ClientQuotaType): lang.Double = {
+    val quotaMap = quotas(quotaType)
+    var quotaLimit: Any = quotaMap.get(group)
+    if (quotaLimit == null)
+      quotaLimit = quotaMap.get("")
+    if (quotaLimit != null) scaledQuota(quotaType, group, quotaLimit.asInstanceOf[Double]) else null
+  }
+
+  private def scaledQuota(quotaType: ClientQuotaType, group: String, configuredQuota: Double): Double = {
+    if (quotaType == ClientQuotaType.REQUEST)
+      configuredQuota
+    else {
+      val multiplier = partitionRatio.get(group)
+      if (multiplier <= 0.0) configuredQuota else configuredQuota * multiplier
+    }
+  }
+}
+
+
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 453ac910269..47c8f5fa1da 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -18,7 +18,7 @@ import java.io.File
 import java.util.Properties
 
 import kafka.server._
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
 
@@ -27,11 +27,8 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   override protected def securityProtocol = SecurityProtocol.SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
-  override val userPrincipal = "O=A client,CN=localhost"
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
-  override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -39,30 +36,41 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    val defaultProps = quotaTestClients.quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val producerProps = new Properties()
-    producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
-    producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(userPrincipal, producerClientId, producerProps)
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override def userPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "O=A client,CN=localhost")
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> Sanitizer.sanitize(userPrincipal.getName), "client-id" -> clientId)
+      }
 
-    val consumerProps = new Properties()
-    consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
-    consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
-  }
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val producerProps = new Properties()
+        producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+        producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(userPrincipal.getName, producerClientId, producerProps)
 
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
-  }
+        val consumerProps = new Properties()
+        consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+        consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(userPrincipal.getName, consumerClientId, consumerProps)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName) +
+          "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName) +
+          "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+      }
 
-  private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+      private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 91a92faf68f..3386c91ab81 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,9 +17,9 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
+import kafka.server.{ConfigEntityName, KafkaConfig, KafkaServer}
 import kafka.utils.JaasTestUtils
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.{After, Before}
 
@@ -32,20 +32,15 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
-  override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
-  override val producerQuotaId = QuotaId(Some(userPrincipal), None, None)
-  override val consumerQuotaId = QuotaId(Some(userPrincipal), None, None)
-
-
   @Before
   override def setUp() {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    val defaultProps = quotaTestClients.quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
   @After
@@ -54,18 +49,27 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
     closeSasl()
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
-    updateQuotaOverride(props)
-  }
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> userPrincipal.getName, "client-id" -> "")
+      }
 
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    updateQuotaOverride(emptyProps)
-    updateQuotaOverride(emptyProps)
-  }
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
+        updateQuotaOverride(props)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        updateQuotaOverride(emptyProps)
+        updateQuotaOverride(emptyProps)
+      }
 
-  private def updateQuotaOverride(properties: Properties) {
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
+      private def updateQuotaOverride(properties: Properties) {
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e0ab55c2158..bb62fb7fc67 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -746,7 +746,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val unknownConfig = "some.config"
     props.put(unknownConfig, "some.config.value")
 
-    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
       "Listener config not updated")
@@ -799,7 +799,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     listenerProps.foreach(props.remove)
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
-    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
@@ -1054,20 +1054,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
   }
 
-  private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
-    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
-    val newConfig = new Config(configEntries)
-    val configs = if (perBrokerConfig) {
-      servers.map { server =>
-        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
-        (resource, newConfig)
-      }.toMap.asJava
-    } else {
-      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
-    }
-    adminClient.alterConfigs(configs)
-  }
-
   private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1077,7 +1063,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
-    val alterResult = alterConfigs(adminClients.head, newProps, perBrokerConfig)
+    val alterResult = TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig)
     if (expectFailure) {
       val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey)
       val brokerResources = if (perBrokerConfig)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 1aabbb3b1d4..c0bad91d0d7 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,7 +18,10 @@ package kafka.server
 
 import java.util.Collections
 
+import kafka.network.RequestChannel.Session
+import kafka.server.QuotaType._
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{MockTime, Sanitizer}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
@@ -38,43 +41,49 @@ class ClientQuotaManagerTest {
     numCallbacks = 0
   }
 
+  private def maybeRecordAndThrottle(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+    quotaManager.maybeRecordAndThrottle(Session(principal, null),clientId, value, this.callback)
+  }
+
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
-    val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, newMetrics, Produce, time, "")
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true)))
       clientMetrics.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true)))
 
-      assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, true), clientMetrics.quota(randomClient.user, randomClient.clientId))
-      assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId))
-      assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client2.user, client2.clientId))
+      assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault,
+        config.quotaBytesPerSecondDefault, clientMetrics.quota(randomClient.user, randomClient.clientId).bound, 0.0)
+      assertEquals("Should return the overridden value (2000)", 2000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
+      assertEquals("Should return the overridden value (4000)", 4000, clientMetrics.quota(client2.user, client2.clientId).bound, 0.0)
 
       // p1 should be throttled using the overridden quota
-      var throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 2500 * config.numQuotaSamples, this.callback)
+      var throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 2500 * config.numQuotaSamples)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
       // p1 should not longer be throttled after the quota change
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true)))
-      assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the newly overridden value (3000)", 3000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true)))
-      assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the default value (500)", 500, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, None)
       clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true)))
-      assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the newly overridden value (4000)", 4000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 1000 * config.numQuotaSamples)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
     } finally {
@@ -150,11 +159,11 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaConfigPrecedence() {
     val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
-        newMetrics, QuotaType.Produce, time, "")
+        newMetrics, Produce, time, "")
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
-      assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
-      val throttleTimeMs = quotaManager.maybeRecordAndThrottle(user, clientId, value * config.numQuotaSamples, this.callback)
+      assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+      val throttleTimeMs = maybeRecordAndThrottle(quotaManager, user, clientId, value * config.numQuotaSamples)
       if (expectThrottle)
         assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
       else
@@ -223,14 +232,14 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaViolation() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
        */
       for (_ <- 0 until 10) {
-        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
+        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -241,7 +250,7 @@ class ClientQuotaManagerTest {
       // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
       // 10.5 seconds because the last window is half complete
       time.sleep(500)
-      val sleepTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 2300, callback)
+      val sleepTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 2300)
 
       assertEquals("Should be throttled", 2100, sleepTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -257,12 +266,12 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 10) {
-        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
+        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 0, callback))
+                   0, maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 0))
     } finally {
       clientMetrics.shutdown()
     }
@@ -271,7 +280,7 @@ class ClientQuotaManagerTest {
   @Test
   def testRequestPercentageQuotaViolation() {
     val metrics = newMetrics
-    val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "")
+    val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "", None)
     quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
     def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
@@ -280,7 +289,7 @@ class ClientQuotaManagerTest {
        * if we are under the quota
        */
       for (_ <- 0 until 10) {
-        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -292,7 +301,7 @@ class ClientQuotaManagerTest {
       // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
       // 10.5 seconds interval because the last window is half complete
       time.sleep(500)
-      val throttleTime = quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
+      val throttleTime = maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(67.1))
 
       assertEquals("Should be throttled", 210, throttleTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -308,22 +317,22 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 11) {
-        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
 
       // Create a very large spike which requires > one quota window to bring within quota
-      assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
+      assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(500)))
       for (_ <- 0 until 10) {
         time.sleep(1000)
-        assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+        assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
       }
       time.sleep(1000)
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
 
     } finally {
       quotaManager.shutdown()
@@ -333,13 +342,13 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireThrottleTimeSensor() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove the throttle time sensor
       metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does not exist.
-      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
+      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
@@ -352,14 +361,14 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireQuotaSensors() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove all the sensors
       metrics.removeSensor("ProduceThrottleTime-:client1")
       metrics.removeSensor("Produce-ANONYMOUS:client1")
       // should not throw an exception
-      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
+      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated
@@ -376,10 +385,10 @@ class ClientQuotaManagerTest {
   @Test
   def testClientIdNotSanitized() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val clientId = "client@#$%"
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", clientId, 100)
 
       // The metrics should use the raw client ID, even if the reporters internally sanitize them
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + clientId)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 5c88bf27f6d..9c8acb48024 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -27,11 +27,12 @@ import org.apache.kafka.common.config.{ConfigException, SslConfigs}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.junit.JUnitSuite
 
 import scala.collection.JavaConverters._
 import scala.collection.Set
 
-class DynamicBrokerConfigTest {
+class DynamicBrokerConfigTest extends JUnitSuite {
 
   @Test
   def testConfigUpdate(): Unit = {
@@ -126,6 +127,35 @@ class DynamicBrokerConfigTest {
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
   }
 
+  @Test
+  def testReconfigurableValidation(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val config = KafkaConfig(origProps)
+    val invalidReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.BrokerIdProp, "some.prop")
+    val validReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.LogCleanerDedupeBufferSizeProp, "some.prop")
+
+    def createReconfigurable(configs: Set[String]) = new Reconfigurable {
+      override def configure(configs: util.Map[String, _]): Unit = {}
+      override def reconfigurableConfigs(): util.Set[String] = configs.asJava
+      override def validateReconfiguration(configs: util.Map[String, _]): Unit = {}
+      override def reconfigure(configs: util.Map[String, _]): Unit = {}
+    }
+    intercept[IllegalArgumentException] {
+      config.dynamicConfig.addReconfigurable(createReconfigurable(invalidReconfigurableProps))
+    }
+    config.dynamicConfig.addReconfigurable(createReconfigurable(validReconfigurableProps))
+
+    def createBrokerReconfigurable(configs: Set[String]) = new BrokerReconfigurable {
+      override def reconfigurableConfigs: collection.Set[String] = configs
+      override def validateReconfiguration(newConfig: KafkaConfig): Unit = {}
+      override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
+    }
+    intercept[IllegalArgumentException] {
+      config.dynamicConfig.addBrokerReconfigurable(createBrokerReconfigurable(invalidReconfigurableProps))
+    }
+    config.dynamicConfig.addBrokerReconfigurable(createBrokerReconfigurable(validReconfigurableProps))
+  }
+
   @Test
   def testSecurityConfigs(): Unit = {
     def verifyUpdate(name: String, value: Object): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bfbae2bde03..2a7d6d400d5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -23,6 +23,7 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
+
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
@@ -132,13 +133,16 @@ class RequestQuotaTest extends BaseRequestTest {
     waitAndCheckResults()
   }
 
+  def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
+
   private def throttleTimeMetricValue(clientId: String): Double = {
     val metricName = leaderNode.metrics.metricName("throttle-time",
                                   QuotaType.Request.toString,
                                   "",
                                   "user", "",
                                   "client-id", clientId)
-    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).throttleTimeSensor
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
+      clientId).throttleTimeSensor
     metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
   }
 
@@ -148,7 +152,8 @@ class RequestQuotaTest extends BaseRequestTest {
                                   "",
                                   "user", "",
                                   "client-id", clientId)
-    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).quotaSensor
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
+      clientId).quotaSensor
     metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4b8740600ff..16b7e87487e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -41,9 +41,11 @@ import Implicits._
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1492,6 +1494,21 @@ object TestUtils extends Logging {
     }
   }
 
+  def alterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
+                   perBrokerConfig: Boolean): AlterConfigsResult = {
+    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
+    val newConfig = new Config(configEntries)
+    val configs = if (perBrokerConfig) {
+      servers.map { server =>
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+        (resource, newConfig)
+      }.toMap.asJava
+    } else {
+      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
+    }
+    adminClient.alterConfigs(configs)
+  }
+
   /**
    * Capture the console output during the execution of the provided function.
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Configurable Quota Management (KIP-257)
> ---------------------------------------
>
>                 Key: KAFKA-6576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6576
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management] for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)