You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/22 01:07:48 UTC

kafka git commit: KAFKA-2209; Change quotas dynamically using DynamicConfigManager

Repository: kafka
Updated Branches:
  refs/heads/trunk 361686d4a -> d9b1dc708


KAFKA-2209; Change quotas dynamically using DynamicConfigManager

Changes in this patch are:
1. ClientIdConfigHandler now passes through the config changes to the quota manager.
2. Removed static KafkaConfigs for quota overrides. These are no longer needed since we can override configs through ZooKeeper.
3. Added testcases to verify that the config changes are propogated from ZK (written using AdminTools) to the actual Metric objects.

Author: Aditya Auradkar <aauradka@aauradka-mn1.(none)>
Author: Aditya Auradkar <aa...@aauradka-mn1.linkedin.biz>

Reviewers: Dong Lin <li...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #298 from auradkar/K-2209


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9b1dc70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9b1dc70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9b1dc70

Branch: refs/heads/trunk
Commit: d9b1dc70817de9366513b0cbed0b6e0702d473fa
Parents: 361686d
Author: Aditya Auradkar <aauradka@aauradka-mn1.(none)>
Authored: Wed Oct 21 16:07:39 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 21 16:07:39 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/metrics/Quota.java  |   4 +-
 .../apache/kafka/common/metrics/stats/Avg.java  |   2 +-
 .../kafka/common/metrics/MetricsTest.java       |  10 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  17 +--
 .../scala/kafka/server/ClientQuotaManager.scala |  73 ++++++++-----
 .../main/scala/kafka/server/ConfigHandler.scala |  30 +++--
 .../kafka/server/DynamicConfigManager.scala     |   6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   5 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  12 --
 .../main/scala/kafka/server/KafkaServer.scala   |  10 +-
 .../integration/kafka/api/QuotasTest.scala      |  41 ++++---
 .../test/scala/unit/kafka/admin/AdminTest.scala |  36 +++++-
 .../kafka/server/ClientQuotaManagerTest.scala   | 109 +++++++------------
 .../kafka/server/DynamicConfigChangeTest.scala  |  26 +++--
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 -
 15 files changed, 216 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
index 235b599..8431e50 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
@@ -29,11 +29,11 @@ public final class Quota {
         this.upper = upper;
     }
 
-    public static Quota lessThan(double upperBound) {
+    public static Quota upperBound(double upperBound) {
         return new Quota(upperBound, true);
     }
 
-    public static Quota moreThan(double lowerBound) {
+    public static Quota lowerBound(double lowerBound) {
         return new Quota(lowerBound, false);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
index ed6767f..0fe7380 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
@@ -39,7 +39,7 @@ public class Avg extends SampledStat {
             total += s.value;
             count += s.eventCount;
         }
-        return total / count;
+        return count == 0 ? 0 : total / count;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index bd84ebe..d465c98 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -320,8 +320,8 @@ public class MetricsTest {
     @Test
     public void testQuotas() {
         Sensor sensor = metrics.sensor("test");
-        sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
-        sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
+        sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
+        sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
         sensor.record(5.0);
         try {
             sensor.record(1.0);
@@ -341,12 +341,12 @@ public class MetricsTest {
 
     @Test
     public void testQuotasEquality() {
-        final Quota quota1 = Quota.lessThan(10.5);
-        final Quota quota2 = Quota.moreThan(10.5);
+        final Quota quota1 = Quota.upperBound(10.5);
+        final Quota quota2 = Quota.lowerBound(10.5);
 
         assertFalse("Quota with different upper values shouldn't be equal", quota1.equals(quota2));
 
-        final Quota quota3 = Quota.moreThan(10.5);
+        final Quota quota3 = Quota.lowerBound(10.5);
 
         assertTrue("Quota with same upper and bound values should be equal", quota2.equals(quota3));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index ecc5b9d..6fff176 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -98,7 +98,7 @@ object AdminUtils extends Logging {
  /**
   * Add partitions to existing topic with optional replica assignment
   *
-  * @param zkClient Zookeeper client
+  * @param zkUtils Zookeeper utilities
   * @param topic Topic for adding partitions to
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignmentStr Manual replica assignment
@@ -177,7 +177,7 @@ object AdminUtils extends Logging {
   /**
    * Delete the whole directory of the given consumer group if the group is inactive.
    *
-   * @param zkClient Zookeeper client
+   * @param zkUtils Zookeeper utilities
    * @param group Consumer group
    * @return whether or not we deleted the consumer group information
    */
@@ -194,7 +194,7 @@ object AdminUtils extends Logging {
    * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
    * If the consumer group consumes no other topics, delete the whole consumer group directory.
    *
-   * @param zkClient Zookeeper client
+   * @param zkUtils Zookeeper utilities
    * @param group Consumer group
    * @param topic Topic of the consumer group information we wish to delete
    * @return whether or not we deleted the consumer group information for the given topic
@@ -216,7 +216,7 @@ object AdminUtils extends Logging {
   /**
    * Delete every inactive consumer group's information about the given topic in Zookeeper.
    *
-   * @param zkClient Zookeeper client
+   * @param zkUtils Zookeeper utilities
    * @param topic Topic of the consumer group information we wish to delete
    */
   def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
@@ -294,7 +294,7 @@ object AdminUtils extends Logging {
 
   /**
    * Update the config for a client and create a change notification so the change will propagate to other brokers
-   * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param clientId: The clientId for which configs are being changed
    * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
    *                 existing configs need to be deleted, it should be done prior to invoking this API
@@ -306,7 +306,7 @@ object AdminUtils extends Logging {
 
   /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
-   * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param topic: The topic for which configs are being changed
    * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
    *                 existing configs need to be deleted, it should be done prior to invoking this API
@@ -379,6 +379,9 @@ object AdminUtils extends Logging {
   def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
     zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
 
+  def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
+    zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
+
   def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
 
@@ -387,8 +390,6 @@ object AdminUtils extends Logging {
     topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
   }
 
-
-
   private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
     if(zkUtils.pathExists(getTopicPath(topic))) {
       val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 24f294d..82fec73 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import java.util.concurrent.{DelayQueue, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 
 import kafka.utils.{ShutdownableThread, Logging}
 import org.apache.kafka.common.MetricName
@@ -36,15 +36,12 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor
 /**
  * Configuration settings for quota management
  * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
- * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y"
  * @param numQuotaSamples The number of samples to retain in memory
  * @param quotaWindowSizeSeconds The time span of each sample
  *
  */
 case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
                                         ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
-                                    quotaBytesPerSecondOverrides: String =
-                                        ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides,
                                     numQuotaSamples: Int =
                                         ClientQuotaManagerConfig.DefaultNumQuotaSamples,
                                     quotaWindowSizeSeconds: Int =
@@ -52,11 +49,9 @@ case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
 
 object ClientQuotaManagerConfig {
   val QuotaBytesPerSecondDefault = Long.MaxValue
-  val QuotaBytesPerSecondOverrides = ""
   // Always have 10 whole windows + 1 current window
   val DefaultNumQuotaSamples = 11
   val DefaultQuotaWindowSizeSeconds = 1
-  val MaxThrottleTimeSeconds = 30
   // Purge sensors after 1 hour of inactivity
   val InactiveSensorExpirationTimeSeconds  = 3600
 }
@@ -73,8 +68,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val apiKey: String,
                          private val time: Time) extends Logging {
-  private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides)
-  private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault)
+  private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
+  private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
@@ -124,13 +119,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
         throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
+        clientSensors.throttleTimeSensor.record(throttleTimeMs)
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
         delayQueueSensor.record()
         // If delayed, add the element to the delayQueue
         logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
     }
-    // If the request is not throttled, a throttleTime of 0 ms is recorded
-    clientSensors.throttleTimeSensor.record(throttleTimeMs)
     throttleTimeMs
   }
 
@@ -160,10 +154,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Returns the consumer quota for the specified clientId
-   * @return
+   * Returns the quota for the specified clientId
    */
-  private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota)
+  def quota(clientId: String): Quota =
+    if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota;
 
   /*
    * This function either returns the sensors for a given client id or creates them if they don't exist
@@ -172,8 +166,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
 
     // Names of the sensors to access
-    val quotaSensorName = apiKey + "-" + clientId
-    val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId
+    val quotaSensorName = getQuotaSensorName(clientId)
+    val throttleTimeSensorName = getThrottleTimeSensorName(clientId)
     var quotaSensor: Sensor = null
     var throttleTimeSensor: Sensor = null
 
@@ -231,6 +225,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     ClientSensors(quotaSensor, throttleTimeSensor)
   }
 
+  private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId
+
+  private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId
+
   private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
             .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
@@ -238,21 +236,38 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             .quota(quota)
   }
 
-  /* Construct a Map of (clientId -> Quota)
-   * The input config is specified as a comma-separated K=V pairs
+  /**
+   * Overrides quotas per clientId
+   * @param clientId client to override
+   * @param quota custom quota to apply
    */
-  private def initQuotaMap(input: String): Map[String, Quota] = {
-    // If empty input, return an empty map
-    if (input.trim.length == 0)
-      Map[String, Quota]()
-    else
-      input.split(",").map(entry => {
-        val trimmedEntry = entry.trim
-        val pair: Array[String] = trimmedEntry.split("=")
-        if (pair.length != 2)
-          throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry))
-        pair(0) -> new Quota(pair(1).toDouble, true)
-      }).toMap
+  def updateQuota(clientId: String, quota: Quota) = {
+    /*
+     * Acquire the write lock to apply changes in the quota objects.
+     * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
+     * If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map.
+     * The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change
+     * notifications
+     */
+    lock.writeLock().lock()
+    try {
+      logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")
+
+      if (quota.equals(defaultQuota))
+        this.overriddenQuota.remove(clientId)
+      else
+        this.overriddenQuota.put(clientId, quota)
+
+      // Change the underlying metric config if the sensor has been created
+      val allMetrics = metrics.metrics()
+      val quotaMetricName = clientRateMetricName(clientId)
+      if (allMetrics.containsKey(quotaMetricName)) {
+        logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
+        allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota))
+      }
+    } finally {
+      lock.writeLock().unlock()
+    }
   }
 
   private def clientRateMetricName(clientId: String): MetricName = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 8347a69..606156a 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -21,22 +21,24 @@ import java.util.Properties
 
 import kafka.common.TopicAndPartition
 import kafka.log.{Log, LogConfig, LogManager}
-import kafka.utils.Pool
+import kafka.api.RequestKeys
+import org.apache.kafka.common.metrics.Quota
 
 import scala.collection.mutable
+import scala.collection.Map
 
 /**
  * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
  */
 trait ConfigHandler {
-  def processConfigChanges(entityName : String, value : Properties)
+  def processConfigChanges(entityName: String, value: Properties)
 }
 
 /**
  * The TopicConfigHandler will process topic config changes in ZK.
  * The callback provides the topic name and the full properties set read from ZK
  */
-class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
+class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler {
 
   def processConfigChanges(topic : String, topicConfig : Properties) {
     val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
@@ -55,15 +57,27 @@ class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandl
   }
 }
 
+object ClientConfigOverride {
+  val ProducerOverride = "producer_byte_rate"
+  val ConsumerOverride = "consumer_byte_rate"
+}
+
 /**
  * The ClientIdConfigHandler will process clientId config changes in ZK.
  * The callback provides the clientId and the full properties set read from ZK.
- * This implementation does nothing currently. In the future, it will change quotas per client
+ * This implementation reports the overrides to the respective ClientQuotaManager objects
  */
-class ClientIdConfigHandler extends ConfigHandler {
-  val configPool = new Pool[String, Properties]()
+class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {
 
-  def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
-    configPool.put(clientId, clientConfig)
+  def processConfigChanges(clientId: String, clientConfig: Properties) = {
+    if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
+      quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId,
+        new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
+    }
+
+    if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
+      quotaManagers(RequestKeys.FetchKey).updateQuota(clientId,
+        new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index d443a1f..cb4b8f1 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -71,7 +71,7 @@ object ConfigType {
  *
  */
 class DynamicConfigManager(private val zkUtils: ZkUtils,
-                           private val configHandler : Map[String, ConfigHandler],
+                           private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = SystemTime) extends Logging {
   private var lastExecutedChange = -1L
@@ -138,7 +138,9 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
             case Some(value: String) => value
             case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
           }
-          configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity))
+          val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
+          logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
+          configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
         case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
                                                              "{\"version\" : 1," +

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c80bd46..2ef9730 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,7 +30,6 @@ import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
@@ -55,7 +54,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
   // Store all the quota managers for each type of request
-  private val quotaManagers = instantiateQuotaManagers(config)
+  val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -784,14 +783,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
     val producerQuotaManagerCfg = ClientQuotaManagerConfig(
       quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
-      quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides,
       numQuotaSamples = cfg.numQuotaSamples,
       quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
     )
 
     val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
       quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
-      quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides,
       numQuotaSamples = cfg.numQuotaSamples,
       quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b054f48..5b311e2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -142,8 +142,6 @@ object Defaults {
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
   val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
-  val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
-  val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
   val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
   val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
 
@@ -294,8 +292,6 @@ object KafkaConfig {
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
-  val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
-  val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
   val NumQuotaSamplesProp = "quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
 
@@ -468,10 +464,6 @@ object KafkaConfig {
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
   val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
-  val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " +
-          "Example: clientIdX=10485760,clientIdY=10485760"
-  val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " +
-          "Example: clientIdX=10485760,clientIdY=10485760"
   val NumQuotaSamplesDoc = "The number of samples to retain in memory"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample"
 
@@ -644,8 +636,6 @@ object KafkaConfig {
       /** ********* Quota configuration ***********/
       .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
       .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
-      .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc)
-      .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc)
       .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
 
@@ -846,8 +836,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
-  val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp)
-  val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp)
   val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
   val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 84d48cb..d2a1e61 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -209,7 +209,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
-                                                           ConfigType.Client -> new ClientIdConfigHandler)
+                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
+
+        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
+        // TODO: Move this logic to DynamicConfigManager
+        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
+          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
+        }
+
+        // Create the config manager. start listening to notifications
         dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 735a3b2..649c927 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -16,21 +16,23 @@ package kafka.api
 
 import java.util.Properties
 
-import junit.framework.Assert
+import kafka.admin.AdminUtils
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.KafkaMetric
-import org.junit.Assert._
+import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
 import org.junit.{After, Before, Test}
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
+import scala.collection.Map
 import scala.collection.mutable
 
 class QuotasTest extends KafkaServerTestHarness {
@@ -47,10 +49,6 @@ class QuotasTest extends KafkaServerTestHarness {
   overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
   overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
 
-  // un-throttled
-  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue)
-  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue)
-
   override def generateConfigs() = {
     FixedPortTestUtils.createBrokerConfigs(numServers,
                                            zkConnect,
@@ -110,7 +108,6 @@ class QuotasTest extends KafkaServerTestHarness {
     consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
     consumers += new KafkaConsumer(consumerProps)
     replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
-
   }
 
   @After
@@ -132,7 +129,7 @@ class QuotasTest extends KafkaServerTestHarness {
                                     RequestKeys.nameForKey(RequestKeys.ProduceKey),
                                     "Tracking throttle-time per client",
                                     "client-id", producerId1)
-    Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
+    assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
 
     // Consumer should read in a bursty manner and get throttled immediately
     consume(consumers.head, numRecords)
@@ -143,11 +140,29 @@ class QuotasTest extends KafkaServerTestHarness {
                                             RequestKeys.nameForKey(RequestKeys.FetchKey),
                                             "Tracking throttle-time per client",
                                             "client-id", consumerId1)
-    Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
+    assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
   }
 
   @Test
   def testProducerConsumerOverrideUnthrottled() {
+    // Give effectively unlimited quota for producerId2 and consumerId2
+    val props = new Properties()
+    props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString)
+    props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString)
+
+    AdminUtils.changeClientIdConfig(zkUtils, producerId2, props)
+    AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props)
+
+    TestUtils.retry(10000) {
+      val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers
+      val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2)
+      val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2)
+
+      assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
+    }
+
+
     val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
     val numRecords = 1000
     produce(producers(1), numRecords)
@@ -155,7 +170,7 @@ class QuotasTest extends KafkaServerTestHarness {
                                             RequestKeys.nameForKey(RequestKeys.ProduceKey),
                                             "Tracking throttle-time per client",
                                             "client-id", producerId2)
-    Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value())
+    assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
 
     // The "client" consumer does not get throttled.
     consume(consumers(1), numRecords)
@@ -166,7 +181,7 @@ class QuotasTest extends KafkaServerTestHarness {
                                             RequestKeys.nameForKey(RequestKeys.FetchKey),
                                             "Tracking throttle-time per client",
                                             "client-id", consumerId2)
-    Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value())
+    assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
   }
 
   def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 52ea580..0570c79 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -17,6 +17,8 @@
 package kafka.admin
 
 import junit.framework.Assert._
+import kafka.api.RequestKeys
+import org.apache.kafka.common.metrics.Quota
 import org.junit.Test
 import java.util.Properties
 import kafka.utils._
@@ -28,6 +30,7 @@ import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
 import java.io.File
 import TestUtils._
 
+import scala.collection.{Map, immutable}
 
 class AdminTest extends ZooKeeperTestHarness with Logging {
 
@@ -102,7 +105,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       10 -> List(1, 2, 3),
       11 -> List(1, 3, 4)
     )
-    val leaderForPartitionMap = Map(
+    val leaderForPartitionMap = immutable.Map(
       0 -> 0,
       1 -> 1,
       2 -> 2,
@@ -417,4 +420,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       server.config.logDirs.foreach(CoreUtils.rm(_))
     }
   }
+
+  /**
+   * This test simulates a client config change in ZK whose notification has been purged.
+   * Basically, it asserts that notifications are bootstrapped from ZK
+   */
+  @Test
+  def testBootstrapClientIdConfig() {
+    val clientId = "my-client"
+    val props = new Properties()
+    props.setProperty("producer_byte_rate", "1000")
+    props.setProperty("consumer_byte_rate", "2000")
+
+    // Write config without notification to ZK.
+    val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
+    val map = Map("version" -> 1, "config" -> configMap)
+    zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map))
+
+    val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
+    assertEquals("Must have 1 overriden client config", 1, configInZk.size)
+    assertEquals(props, configInZk(clientId))
+
+    // Test that the existing clientId overrides are read
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+    try {
+      assertEquals(new Quota(1000, true), server.apis.quotaManagers(RequestKeys.ProduceKey).quota(clientId));
+      assertEquals(new Quota(2000, true), server.apis.quotaManagers(RequestKeys.FetchKey).quota(clientId));
+    } finally {
+      server.shutdown()
+      server.config.logDirs.foreach(CoreUtils.rm(_))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 75e856a..fadcd5a 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -21,13 +21,13 @@ import java.util.Collections
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
-import org.junit.{Assert, Before, Test}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
 
 class ClientQuotaManagerTest {
   private val time = new MockTime
 
-  private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
-                                                quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
+  private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
 
   var numCallbacks: Int = 0
   def callback(delayTimeMs: Int) {
@@ -42,13 +42,34 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaParsing() {
     val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
+
+    // Case 1: Update the quota. Assert that the new quota value is returned
+    clientMetrics.updateQuota("p1", new Quota(2000, true));
+    clientMetrics.updateQuota("p2", new Quota(4000, true));
+
     try {
-      Assert.assertEquals("Default producer quota should be 500",
-                          new Quota(500, true), clientMetrics.quota("random-client-id"))
-      Assert.assertEquals("Should return the overridden value (2000)",
-                          new Quota(2000, true), clientMetrics.quota("p1"))
-      Assert.assertEquals("Should return the overridden value (4000)",
-                          new Quota(4000, true), clientMetrics.quota("p2"))
+      assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id"))
+      assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota("p1"))
+      assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2"))
+
+      // p1 should be throttled using the overridden quota
+      var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * config.numQuotaSamples, this.callback)
+      assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
+
+      // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
+      // p1 should not longer be throttled after the quota change
+      clientMetrics.updateQuota("p1", new Quota(3000, true));
+      assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1"))
+
+      throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
+      assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
+
+      // Case 3: Change quota back to default. Should be throttled again
+      clientMetrics.updateQuota("p1", new Quota(500, true));
+      assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1"))
+
+      throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
+      assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
     } finally {
       clientMetrics.shutdown()
     }
@@ -67,8 +88,8 @@ class ClientQuotaManagerTest {
         clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
         time.sleep(1000)
       }
-      Assert.assertEquals(10, numCallbacks)
-      Assert.assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(10, numCallbacks)
+      assertEquals(0, queueSizeMetric.value().toInt)
 
       // Create a spike.
       // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
@@ -77,17 +98,17 @@ class ClientQuotaManagerTest {
       time.sleep(500)
       val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback)
 
-      Assert.assertEquals("Should be throttled", 2100, sleepTime)
-      Assert.assertEquals(1, queueSizeMetric.value().toInt)
+      assertEquals("Should be throttled", 2100, sleepTime)
+      assertEquals(1, queueSizeMetric.value().toInt)
       // After a request is delayed, the callback cannot be triggered immediately
       clientMetrics.throttledRequestReaper.doWork()
-      Assert.assertEquals(10, numCallbacks)
+      assertEquals(10, numCallbacks)
       time.sleep(sleepTime)
 
       // Callback can only be triggered after the the delay time passes
       clientMetrics.throttledRequestReaper.doWork()
-      Assert.assertEquals(0, queueSizeMetric.value().toInt)
-      Assert.assertEquals(11, numCallbacks)
+      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(11, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
       for (i <- 0 until 10) {
@@ -95,63 +116,11 @@ class ClientQuotaManagerTest {
         time.sleep(1000)
       }
 
-      Assert.assertEquals("Should be unthrottled since bursty sample has rolled over",
-                          0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
-    } finally {
-      clientMetrics.shutdown()
-    }
-  }
-
-  @Test
-  def testOverrideParse() {
-    var testConfig = ClientQuotaManagerConfig()
-    var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
-
-    try {
-      // Case 1 - Default config
-      Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true),
-                          clientMetrics.quota("p1"))
+      assertEquals("Should be unthrottled since bursty sample has rolled over",
+                   0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
     } finally {
       clientMetrics.shutdown()
     }
-
-
-    // Case 2 - Empty override
-    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
-                                          quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,")
-
-    clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
-    try {
-      Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1"))
-      Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2"))
-    } finally {
-      clientMetrics.shutdown()
-    }
-
-    // Case 3 - NumberFormatException for override
-    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
-                                          quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4")
-    try {
-      clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
-      Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
-    }
-    catch {
-      // Swallow.
-      case nfe: NumberFormatException =>
-    }
-
-    // Case 4 - IllegalArgumentException for override
-    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
-                                          quotaBytesPerSecondOverrides = "p1=2000=3000")
-    try {
-      clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time)
-      Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
-    }
-    catch {
-      // Swallow.
-      case nfe: IllegalArgumentException =>
-    }
-
   }
 
   def newMetrics: Metrics = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 6061e66..6b49c4e 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -18,7 +18,9 @@ package kafka.server
 
 import java.util.Properties
 
-import junit.framework.Assert._
+import org.junit.Assert._
+import kafka.api.RequestKeys
+import org.apache.kafka.common.metrics.Quota
 import org.easymock.{Capture, EasyMock}
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
@@ -27,6 +29,8 @@ import kafka.common._
 import kafka.log.LogConfig
 import kafka.admin.{AdminOperationException, AdminUtils}
 
+import scala.collection.Map
+
 class DynamicConfigChangeTest extends KafkaServerTestHarness {
   def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
@@ -52,22 +56,26 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     }
   }
 
-  // For now client config changes do not do anything. Simply verify that the call was made
   @Test
-  def testClientConfigChange() {
+  def testClientQuotaConfigChange() {
     assertTrue("Should contain a ConfigHandler for topics",
                this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
     val clientId = "testClient"
     val props = new Properties()
-    props.put("a.b", "c")
-    props.put("x.y", "z")
+    props.put(ClientConfigOverride.ProducerOverride, "1000")
+    props.put(ClientConfigOverride.ConsumerOverride, "2000")
     AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
+
     TestUtils.retry(10000) {
       val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
-      assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
-      assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size)
-      assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b"))
-      assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y"))
+      val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers
+      val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(clientId)
+      val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(clientId)
+
+      assertEquals(s"ClientId $clientId must have overridden producer quota of 1000",
+        Quota.upperBound(1000), overrideProducerQuota)
+        assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
+        Quota.upperBound(2000), overrideConsumerQuota)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b1dc70/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1c3e55d..3e277fa 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -479,8 +479,6 @@ class KafkaConfigTest {
         case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
         case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
-        case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string
-        case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string
         case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")