You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/08/15 04:07:01 UTC

kafka git commit: KAFKA-2084; Add per-client-id byte-rate metrics and quota manager; reviewed by Joel Koshy, Dong Lin, Jun Rao and Edward Ribeiro

Repository: kafka
Updated Branches:
  refs/heads/trunk f6acfb089 -> bbb7d97ad


KAFKA-2084; Add per-client-id byte-rate metrics and quota manager; reviewed by Joel Koshy, Dong Lin, Jun Rao and Edward Ribeiro


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb7d97a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb7d97a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb7d97a

Branch: refs/heads/trunk
Commit: bbb7d97adefe5826f2e02a8e55423ea215c9f749
Parents: f6acfb0
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Fri Aug 14 18:51:48 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 14 18:51:48 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../org/apache/kafka/common/metrics/Quota.java  |  18 ++
 .../common/metrics/QuotaViolationException.java |   1 -
 .../org/apache/kafka/common/metrics/Sensor.java |  11 +-
 .../apache/kafka/common/metrics/stats/Rate.java |  23 +-
 .../kafka/common/metrics/MetricsTest.java       |  32 ++-
 .../scala/kafka/server/ClientQuotaManager.scala | 250 +++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  88 +++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  56 ++++-
 .../main/scala/kafka/server/KafkaServer.scala   |  24 +-
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../scala/kafka/server/ThrottledResponse.scala  |  46 ++++
 .../scala/kafka/utils/ShutdownableThread.scala  |   3 +
 .../integration/kafka/api/QuotasTest.scala      | 194 ++++++++++++++
 .../kafka/server/ClientQuotaManagerTest.scala   | 159 ++++++++++++
 .../unit/kafka/server/KafkaConfigTest.scala     |   6 +
 .../ThrottledResponseExpirationTest.scala       |  90 +++++++
 17 files changed, 945 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 864427b..c7f66be 100644
--- a/build.gradle
+++ b/build.gradle
@@ -253,6 +253,7 @@ project(':core') {
     testCompile "$easymock"
     testCompile 'org.objenesis:objenesis:1.2'
     testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
+    testCompile project(path: ':clients', configuration: 'archives')
 
     testRuntime "$slf4jlog4j"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
index d82bb0c..a3535dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
@@ -49,4 +49,22 @@ public final class Quota {
         return (upper && value <= bound) || (!upper && value >= bound);
     }
 
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) this.bound;
+        result = prime * result + (this.upper ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (!(obj instanceof Quota))
+            return false;
+        Quota that = (Quota) obj;
+        return (that.bound == this.bound) && (this.upper == this.upper);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
index a451e53..fbe03f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
@@ -28,5 +28,4 @@ public class QuotaViolationException extends KafkaException {
     public QuotaViolationException(String m) {
         super(m);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ca823fd..4d55771 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -112,8 +112,14 @@ public final class Sensor {
             if (config != null) {
                 Quota quota = config.quota();
                 if (quota != null) {
-                    if (!quota.acceptable(metric.value(timeMs)))
-                        throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
+                    double value = metric.value(timeMs);
+                    if (!quota.acceptable(value)) {
+                        throw new QuotaViolationException(String.format(
+                            "(%s) violated quota. Actual: (%f), Threshold: (%f)",
+                            metric.metricName(),
+                            quota.bound(),
+                            value));
+                    }
                 }
             }
         }
@@ -170,5 +176,4 @@ public final class Sensor {
     synchronized List<KafkaMetric> metrics() {
         return Collections.unmodifiableList(this.metrics);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 98429da..fe43940 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
 
+
 /**
  * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
  * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
@@ -58,26 +59,28 @@ public class Rate implements MeasurableStat {
     @Override
     public double measure(MetricConfig config, long now) {
         double value = stat.measure(config, now);
-        double elapsed = convert(now - stat.oldest(now).lastWindowMs);
-        return value / elapsed;
+        // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete
+        long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs;
+        long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1);
+        return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs);
     }
 
-    private double convert(long time) {
+    private double convert(long timeMs) {
         switch (unit) {
             case NANOSECONDS:
-                return time * 1000.0 * 1000.0;
+                return timeMs * 1000.0 * 1000.0;
             case MICROSECONDS:
-                return time * 1000.0;
+                return timeMs * 1000.0;
             case MILLISECONDS:
-                return time;
+                return timeMs;
             case SECONDS:
-                return time / 1000.0;
+                return timeMs / 1000.0;
             case MINUTES:
-                return time / (60.0 * 1000.0);
+                return timeMs / (60.0 * 1000.0);
             case HOURS:
-                return time / (60.0 * 60.0 * 1000.0);
+                return timeMs / (60.0 * 60.0 * 1000.0);
             case DAYS:
-                return time / (24.0 * 60.0 * 60.0 * 1000.0);
+                return timeMs / (24.0 * 60.0 * 60.0 * 1000.0);
             default:
                 throw new IllegalStateException("Unknown unit: " + unit);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 544e120..0a7dcd8 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -37,9 +37,9 @@ import org.junit.Test;
 public class MetricsTest {
 
     private static final double EPS = 0.000001;
-
-    MockTime time = new MockTime();
-    Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
+    private MockTime time = new MockTime();
+    private MetricConfig config = new MetricConfig();
+    private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time);
 
     @Test
     public void testMetricName() {
@@ -77,19 +77,33 @@ public class MetricsTest {
         s2.add(new MetricName("s2.total", "grp1"), new Total());
         s2.record(5.0);
 
-        for (int i = 0; i < 10; i++)
+        int sum = 0;
+        int count = 10;
+        for (int i = 0; i < count; i++) {
             s.record(i);
+            sum += i;
+        }
+        // prior to any time passing
+        double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
+        assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
+                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
 
         // pretend 2 seconds passed...
-        time.sleep(2000);
+        long sleepTimeMs = 2;
+        time.sleep(sleepTimeMs * 1000);
+        elapsedSecs += sleepTimeMs;
 
         assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS);
         assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS);
-        assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
+        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
         assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS);
-        assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
-        assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
-        assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
+        assertEquals("Rate(0...9) = 1.40625",
+                     sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
+        assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
+                     count / elapsedSecs,
+                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+        assertEquals("Count(0...9) = 10",
+                     (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
new file mode 100644
index 0000000..9f8473f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.{DelayQueue, TimeUnit}
+
+import kafka.utils.{ShutdownableThread, Logging}
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.kafka.common.utils.Time
+
+/**
+ * Represents the sensors aggregated per client
+ * @param quotaSensor @Sensor that tracks the quota
+ * @param throttleTimeSensor @Sensor that tracks the throttle time
+ */
+private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+
+/**
+ * Configuration settings for quota management
+ * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
+ * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y"
+ * @param numQuotaSamples The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds The time span of each sample
+ *
+ */
+case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
+                                        ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
+                                    quotaBytesPerSecondOverrides: String =
+                                        ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides,
+                                    numQuotaSamples: Int =
+                                        ClientQuotaManagerConfig.DefaultNumQuotaSamples,
+                                    quotaWindowSizeSeconds: Int =
+                                        ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds)
+
+object ClientQuotaManagerConfig {
+  val QuotaBytesPerSecondDefault = Long.MaxValue
+  val QuotaBytesPerSecondOverrides = ""
+  // Always have 10 whole windows + 1 current window
+  val DefaultNumQuotaSamples = 11
+  val DefaultQuotaWindowSizeSeconds = 1
+  val MaxThrottleTimeSeconds = 30
+}
+
+/**
+ * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
+ * for all clients.
+ * @param config @ClientQuotaManagerConfig quota configs
+ * @param metrics @Metrics Metrics instance
+ * @param apiKey API Key for the request
+ * @param time @Time object to use
+ */
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+                         private val metrics: Metrics,
+                         private val apiKey: String,
+                         private val time: Time) extends Logging {
+  private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides)
+  private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault)
+  private val lock = new ReentrantReadWriteLock()
+  private val delayQueue = new DelayQueue[ThrottledResponse]()
+  val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
+  throttledRequestReaper.start()
+
+  private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+  delayQueueSensor.add(new MetricName("queue-size",
+                                      apiKey,
+                                      "Tracks the size of the delay queue"), new Total())
+
+  /**
+   * Reaper thread that triggers callbacks on all throttled requests
+   * @param delayQueue DelayQueue to dequeue from
+   */
+  class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
+    "ThrottledRequestReaper-%s".format(apiKey), false) {
+
+    override def doWork(): Unit = {
+      val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
+      if (response != null) {
+        // Decrement the size of the delay queue
+        delayQueueSensor.record(-1)
+        trace("Response throttled for: " + response.delayTimeMs + " ms")
+        response.execute()
+      }
+    }
+  }
+
+  /**
+   * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
+   * @param clientId clientId that produced the data
+   * @param value amount of data written in bytes
+   * @param callback Callback function. This will be triggered immediately if quota is not violated.
+   *                 If there is a quota violation, this callback will be triggered after a delay
+   * @return Number of milliseconds to delay the response in case of Quota violation.
+   *         Zero otherwise
+   */
+  def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = {
+    val clientSensors = getOrCreateQuotaSensors(clientId)
+    var delayTimeMs = 0L
+    try {
+      clientSensors.quotaSensor.record(value)
+      // trigger the callback immediately if quota is not violated
+      callback
+    } catch {
+      case qve: QuotaViolationException =>
+        // Compute the delay
+        val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
+        delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
+        delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback))
+        delayQueueSensor.record()
+        clientSensors.throttleTimeSensor.record(delayTimeMs)
+        // If delayed, add the element to the delayQueue
+        logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs))
+    }
+    delayTimeMs.toInt
+  }
+
+  /*
+   * This calculates the amount of time needed to bring the metric within quota
+   * assuming that no new metrics are recorded.
+   *
+   * Basically, if O is the observed rate and T is the target rate over a window of W, to bring O down to T,
+   * we need to add a delay of X to W such that O * W / (W + X) = T.
+   * Solving for X, we get X = (O - T)/T * W.
+   */
+  private def delayTime(metricValue: Double, config: MetricConfig): Long =
+  {
+    val quota = config.quota()
+    val difference = metricValue - quota.bound
+    val time = difference / quota.bound * config.timeWindowMs() * config.samples()
+    time.round
+  }
+
+  /**
+   * Returns the consumer quota for the specified clientId
+   * @return
+   */
+  private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota)
+
+  /*
+   * This function either returns the sensors for a given client id or creates them if they don't exist
+   * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
+   */
+  private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
+
+    // Names of the sensors to access
+    val quotaSensorName = apiKey + "-" + clientId
+    val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId
+    var quotaSensor: Sensor = null
+    var throttleTimeSensor: Sensor = null
+
+    /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads.
+     * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
+     * will acquire the write lock and prevent the sensors from being read while they are being created.
+     * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
+     * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
+     * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor
+     * at which point it is safe to read
+     */
+    lock.readLock().lock()
+    try {
+      quotaSensor = metrics.getSensor(quotaSensorName)
+      throttleTimeSensor = metrics.getSensor(throttleTimeSensorName)
+    }
+    finally {
+      lock.readLock().unlock()
+    }
+
+    /* If the sensor is null, try to create it else return the created sensor
+     * Also if quota sensor is null, the throttle time sensor must be null
+     */
+    if (quotaSensor == null) {
+      /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
+       * Note that multiple threads may acquire the write lock if they all see a null sensor initially
+       * In this case, the writer checks the sensor after acquiring the lock again.
+       * This is safe from Double Checked Locking because the references are read
+       * after acquiring read locks and hence they cannot see a partially published reference
+       */
+      lock.writeLock().lock()
+      try {
+        quotaSensor = metrics.getSensor(quotaSensorName)
+        if (quotaSensor == null) {
+          // create the throttle time sensor also
+          throttleTimeSensor = metrics.sensor(throttleTimeSensorName)
+          throttleTimeSensor.add(new MetricName("throttle-time",
+                                                apiKey,
+                                                "Tracking average throttle-time per client",
+                                                "client-id",
+                                                clientId), new Avg())
+          quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId)))
+          quotaSensor.add(clientRateMetricName(clientId), new Rate())
+        }
+      } finally {
+        lock.writeLock().unlock()
+      }
+    }
+    // return the read or created sensors
+    ClientSensors(quotaSensor, throttleTimeSensor)
+  }
+
+  private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+    new MetricConfig()
+            .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+            .samples(config.numQuotaSamples)
+            .quota(quota)
+  }
+
+  /* Construct a Map of (clientId -> Quota)
+   * The input config is specified as a comma-separated K=V pairs
+   */
+  private def initQuotaMap(input: String): Map[String, Quota] = {
+    // If empty input, return an empty map
+    if (input.trim.length == 0)
+      Map[String, Quota]()
+    else
+      input.split(",").map(entry => {
+        val trimmedEntry = entry.trim
+        val pair: Array[String] = trimmedEntry.split("=")
+        if (pair.length != 2)
+          throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry))
+        pair(0) -> new Quota(pair(1).toDouble, true)
+      }).toMap
+  }
+
+  private def clientRateMetricName(clientId: String): MetricName = {
+    new MetricName("byte-rate", apiKey,
+                   "Tracking byte-rate per client",
+                   "client-id", clientId)
+  }
+
+  def shutdown() = {
+    throttledRequestReaper.shutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7ea509c..67f0cad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
 import kafka.api._
@@ -42,9 +43,12 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val zkClient: ZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
-                val metadataCache: MetadataCache) extends Logging {
+                val metadataCache: MetadataCache,
+                val metrics: Metrics) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+  // Store all the quota managers for each type of request
+  private val quotaManagers = instantiateQuotaManagers(config)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -250,6 +254,7 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleProducerRequest(request: RequestChannel.Request) {
     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+    val numBytesAppended = produceRequest.sizeInBytes
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
@@ -265,21 +270,27 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      if (produceRequest.requiredAcks == 0) {
-        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
-        // the request, since no response is expected by the producer, the server will close socket server so that
-        // the producer client will know that some error has happened and will refresh its metadata
-        if (errorInResponse) {
-          info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0"
-                  .format(produceRequest.correlationId, produceRequest.clientId))
-          requestChannel.closeConnection(request.processor, request)
+      def produceResponseCallback {
+        if (produceRequest.requiredAcks == 0) {
+          // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+          // the request, since no response is expected by the producer, the server will close socket server so that
+          // the producer client will know that some error has happened and will refresh its metadata
+          if (errorInResponse) {
+            info(
+              "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format(
+                produceRequest.correlationId,
+                produceRequest.clientId))
+            requestChannel.closeConnection(request.processor, request)
+          } else {
+            requestChannel.noOperation(request.processor, request)
+          }
         } else {
-          requestChannel.noOperation(request.processor, request)
+          val response = ProducerResponse(produceRequest.correlationId, responseStatus)
+          requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
         }
-      } else {
-        val response = ProducerResponse(produceRequest.correlationId, responseStatus)
-        requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
       }
+
+      quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback)
     }
 
     // only allow appending to internal topic partitions
@@ -316,14 +327,27 @@ class KafkaApis(val requestChannel: RequestChannel,
             .format(fetchRequest.correlationId, fetchRequest.clientId,
             topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
         }
-
         // record the bytes out metrics only when the response is being sent
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
       }
 
       val response = FetchResponse(fetchRequest.correlationId, responsePartitionData)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
+      def fetchResponseCallback {
+        requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
+      }
+
+      // Do not throttle replication traffic
+      if (fetchRequest.isFromFollower) {
+        fetchResponseCallback
+      } else {
+        quotaManagers.get(RequestKeys.FetchKey) match {
+          case Some(quotaManager) =>
+            quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback)
+          case None =>
+            warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey)))
+        }
+      }
     }
 
     // call the replica manager to fetch messages from the local replica
@@ -604,9 +628,37 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseCallback)
   }
 
+  /*
+   * Returns a Map of all quota managers configured. The request Api key is the key for the Map
+   */
+  private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
+    val producerQuotaManagerCfg = ClientQuotaManagerConfig(
+      quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
+      quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides,
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+
+    val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
+      quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
+      quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides,
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+
+    val quotaManagers = Map[Short, ClientQuotaManager](
+      RequestKeys.ProduceKey ->
+              new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime),
+      RequestKeys.FetchKey ->
+              new ClientQuotaManager(consumerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.FetchKey), new org.apache.kafka.common.utils.SystemTime)
+    )
+    quotaManagers
+  }
+
   def close() {
-    // TODO currently closing the API is an no-op since the API no longer maintain any modules
-    // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
-    debug("Shut down complete.")
+    quotaManagers.foreach { case(apiKey, quotaManager) =>
+      quotaManager.shutdown()
+    }
+    info("Shutdown complete.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a06f0bd..394f21b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,7 +26,10 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.config.ConfigDef.Importance._
+import org.apache.kafka.common.config.ConfigDef.Range._
+import org.apache.kafka.common.config.ConfigDef.Type._
+import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
 
@@ -132,12 +135,21 @@ object Defaults {
   val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs
   val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks
 
+  /** ********* Quota Configuration ***********/
+  val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+  val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+  val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
+  val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
+  val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
+  val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
+
   val DeleteTopicEnable = false
 
   val CompressionType = "producer"
 
+  /** ********* Kafka Metrics Configuration ***********/
   val MetricNumSamples = 2
-  val MetricSampleWindowMs = 1000
+  val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
 }
 
@@ -250,15 +262,22 @@ object KafkaConfig {
   val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms"
   val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
   val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
+  /** ********* Quota Configuration ***********/
+  val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
+  val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
+  val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
+  val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
+  val NumQuotaSamplesProp = "quota.window.num"
+  val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
 
+  /** ********* Kafka Metrics Configuration ***********/
   val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
   val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
 
-
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -388,11 +407,22 @@ object KafkaConfig {
   val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
     "or this timeout is reached. This is similar to the producer request timeout."
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
+  /** ********* Quota Configuration ***********/
+  val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
+  val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
+  val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " +
+          "Example: clientIdX=10485760,clientIdY=10485760"
+  val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " +
+          "Example: clientIdX=10485760,clientIdY=10485760"
+  val NumQuotaSamplesDoc = "The number of samples to retain in memory"
+  val QuotaWindowSizeSecondsDoc = "The time span of each sample"
+
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
     "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
     "'producer' which means retain the original compression codec set by the producer."
 
+  /** ********* Kafka Metrics Configuration ***********/
   val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
   val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
@@ -518,9 +548,19 @@ object KafkaConfig {
       .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
       .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc)
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
+
+      /** ********* Kafka Metrics Configuration ***********/
       .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
       .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
       .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
+
+      /** ********* Quota configuration ***********/
+      .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
+      .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
+      .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc)
+      .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc)
+      .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
+      .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
   }
 
   def configNames() = {
@@ -548,7 +588,6 @@ object KafkaConfig {
     props.putAll(overrides)
     fromProps(props)
   }
-
 }
 
 case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
@@ -661,10 +700,17 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
   val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
 
+  /** ********* Quota Configuration **************/
+  val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
+  val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
+  val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp)
+  val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp)
+  val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
+  val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
+
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)
 
-
   val listeners = getListeners
   val advertisedListeners = getAdvertisedListeners
   val logRetentionTimeMillis = getLogRetentionTimeMillis

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 84d4730..6d65507 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.util
-import java.util.Properties
 
 import kafka.admin._
 import kafka.log.LogConfig
@@ -62,11 +61,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   private val reporters: java.util.List[MetricsReporter] =  config.metricReporterClasses
   reporters.add(new JmxReporter(jmxPrefix))
 
-
-
-  // This exists so SocketServer (which uses Client libraries) can use the client Time objects without having to convert all of Kafka to use them
-  // Once we get rid of kafka.utils.time, we can get rid of this too
-  private val socketServerTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
+  // This exists because the Metrics package from clients has its own Time implementation.
+  // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them
+  // Eventually, we want to merge the Time objects in core and clients
+  private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
+  var metrics: Metrics = null
 
   val brokerState: BrokerState = new BrokerState
 
@@ -80,7 +79,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
-  val metrics: Metrics = new Metrics()
 
   var consumerCoordinator: ConsumerCoordinator = null
 
@@ -92,7 +90,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
 
 
-
   var zkClient: ZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
@@ -121,6 +118,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
+        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime)
+
         brokerState.newState(Starting)
 
         /* start scheduler */
@@ -137,9 +136,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         config.brokerId =  getBrokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
-        val metrics = new Metrics(metricConfig, reporters, socketServerTime)
-
-
         socketServer = new SocketServer(config.brokerId,
                                         config.listeners,
                                         config.numNetworkThreads,
@@ -150,7 +146,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                         config.maxConnectionsPerIp,
                                         config.connectionsMaxIdleMs,
                                         config.maxConnectionsPerIpOverrides,
-                                        socketServerTime,
+                                        kafkaMetricsTime,
                                         metrics)
           socketServer.startup()
 
@@ -168,7 +164,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
           /* start processing requests */
           apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
-            kafkaController, zkClient, config.brokerId, config, metadataCache)
+            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics)
           requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
           brokerState.newState(RunningAsBroker)
 
@@ -362,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
           CoreUtils.swallow(kafkaController.shutdown())
         if(zkClient != null)
           CoreUtils.swallow(zkClient.close())
+        if (metrics != null)
+          CoreUtils.swallow(metrics.close())
 
         brokerState.newState(NotRunning)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2e0bbcd..d829e18 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.metrics.Metrics
 
 import scala.collection._
 
@@ -98,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
-                     val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
+                     val isShuttingDown: AtomicBoolean) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
@@ -440,7 +441,6 @@ class ReplicaManager(val config: KafkaConfig,
                     fetchMinBytes: Int,
                     fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
                     responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
-
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ThrottledResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala
new file mode 100644
index 0000000..1f80d54
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{TimeUnit, Delayed}
+
+import org.apache.kafka.common.utils.Time
+
+
+/**
+ * Represents a request whose response has been delayed.
+ * @param time @Time instance to use
+ * @param delayTimeMs delay associated with this request
+ * @param callback Callback to trigger after delayTimeMs milliseconds
+ */
+private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed {
+  val endTime = time.milliseconds + delayTimeMs
+
+  def execute() = callback
+
+  override def getDelay(unit: TimeUnit): Long = {
+    unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)
+  }
+
+  override def compareTo(d: Delayed): Int = {
+    val other = d.asInstanceOf[ThrottledResponse]
+    if (this.endTime < other.endTime) -1
+    else if (this.endTime > other.endTime) 1
+    else 0
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index fc226c8..dc46797 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -51,6 +51,9 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
     info("Shutdown completed")
   }
 
+  /**
+   * This method is repeatedly invoked until the thread shuts down or this method throws an exception
+   */
   def doWork(): Unit
 
   override def run(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
new file mode 100644
index 0000000..a11bf90
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import junit.framework.Assert
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.KafkaMetric
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import scala.collection.mutable
+
+class QuotasTest extends KafkaServerTestHarness {
+  private val producerBufferSize = 300000
+  private val producerId1 = "QuotasTestProducer-1"
+  private val producerId2 = "QuotasTestProducer-2"
+  private val consumerId1 = "QuotasTestConsumer-1"
+  private val consumerId2 = "QuotasTestConsumer-2"
+
+  val numServers = 2
+  val overridingProps = new Properties()
+
+  // Low enough quota that a producer sending a small payload in a tight loop should get throttled
+  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
+  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
+
+  // un-throttled
+  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue)
+  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue)
+
+  override def generateConfigs() = {
+    FixedPortTestUtils.createBrokerConfigs(numServers,
+                                           zkConnect,
+                                           enableControlledShutdown = false)
+            .map(KafkaConfig.fromProps(_, overridingProps))
+  }
+
+  var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  var replicaConsumers = mutable.Buffer[SimpleConsumer]()
+
+  var leaderNode: KafkaServer = null
+  var followerNode: KafkaServer = null
+  private val topic1 = "topic-1"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2)
+    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    val numPartitions = 1
+    val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
+    leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
+    followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
+    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
+
+    // Create consumers
+    val consumerProps = new Properties
+    consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
+    consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
+
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
+    consumers += new KafkaConsumer(consumerProps)
+    // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled
+    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1)
+
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
+    consumers += new KafkaConsumer(consumerProps)
+    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
+
+  }
+
+  @After
+  override def tearDown() {
+    producers.foreach( _.close )
+    consumers.foreach( _.close )
+    replicaConsumers.foreach( _.close )
+    super.tearDown()
+  }
+
+  @Test
+  def testThrottledProducerConsumer() {
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+
+    val producerMetricName = new MetricName("throttle-time",
+                                    RequestKeys.nameForKey(RequestKeys.ProduceKey),
+                                    "Tracking throttle-time per client",
+                                    "client-id", producerId1)
+    Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
+
+    // Consumer should read in a bursty manner and get throttled immediately
+    consume(consumers.head, numRecords)
+    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+    replicaConsumers.head.fetch(request)
+    val consumerMetricName = new MetricName("throttle-time",
+                                            RequestKeys.nameForKey(RequestKeys.FetchKey),
+                                            "Tracking throttle-time per client",
+                                            "client-id", consumerId1)
+    Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
+  }
+
+  @Test
+  def testProducerConsumerOverrideUnthrottled() {
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers(1), numRecords)
+    val producerMetricName = new MetricName("throttle-time",
+                                            RequestKeys.nameForKey(RequestKeys.ProduceKey),
+                                            "Tracking throttle-time per client",
+                                            "client-id", producerId2)
+    Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value())
+
+    // The "client" consumer does not get throttled.
+    consume(consumers(1), numRecords)
+    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+    replicaConsumers(1).fetch(request)
+    val consumerMetricName = new MetricName("throttle-time",
+                                            RequestKeys.nameForKey(RequestKeys.FetchKey),
+                                            "Tracking throttle-time per client",
+                                            "client-id", consumerId2)
+    Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value())
+  }
+
+  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
+    var numBytesProduced = 0
+    for (i <- 0 to count) {
+      val payload = i.toString.getBytes
+      numBytesProduced += payload.length
+      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
+             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      Thread.sleep(1)
+    }
+    numBytesProduced
+  }
+
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
+    consumer.subscribe(topic1)
+    var numConsumed = 0
+    while (numConsumed < numRecords) {
+      for (cr <- consumer.poll(100)) {
+        numConsumed += 1
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
new file mode 100644
index 0000000..97dcca8
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.Collections
+
+import kafka.api.RequestKeys
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Metrics, Quota, MetricConfig}
+import org.apache.kafka.common.utils.MockTime
+import org.scalatest.junit.JUnit3Suite
+import org.junit.{Before, Test, Assert}
+
+class ClientQuotaManagerTest extends JUnit3Suite {
+  private val time = new MockTime
+
+  private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+                                                quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
+
+  var numCallbacks: Int = 0
+  def callback {
+    numCallbacks += 1
+  }
+
+  @Before
+  def beforeMethod() {
+    numCallbacks = 0
+  }
+
+  @Test
+  def testQuotaParsing() {
+    val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
+    try {
+      Assert.assertEquals("Default producer quota should be 500",
+                          new Quota(500, true), clientMetrics.quota("random-client-id"))
+      Assert.assertEquals("Should return the overridden value (2000)",
+                          new Quota(2000, true), clientMetrics.quota("p1"))
+      Assert.assertEquals("Should return the overridden value (4000)",
+                          new Quota(4000, true), clientMetrics.quota("p2"))
+    } finally {
+      clientMetrics.shutdown()
+    }
+  }
+
+  @Test
+  def testQuotaViolation() {
+    val metrics = newMetrics
+    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+    val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", ""))
+    try {
+      /* We have 10 second windows. Make sure that there is no quota violation
+       * if we produce under the quota
+       */
+      for (i <- 0 until 10) {
+        clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+        time.sleep(1000)
+      }
+      Assert.assertEquals(10, numCallbacks)
+      Assert.assertEquals(0, queueSizeMetric.value().toInt)
+
+      // Create a spike.
+      // 400*10 + 2000 = 6000/10 = 600 bytes per second.
+      // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200
+      val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback)
+      Assert.assertEquals("Should be throttled", 2200, sleepTime)
+      Assert.assertEquals(1, queueSizeMetric.value().toInt)
+      // After a request is delayed, the callback cannot be triggered immediately
+      clientMetrics.throttledRequestReaper.doWork()
+      Assert.assertEquals(10, numCallbacks)
+      time.sleep(sleepTime)
+
+      // Callback can only be triggered after the the delay time passes
+      clientMetrics.throttledRequestReaper.doWork()
+      Assert.assertEquals(0, queueSizeMetric.value().toInt)
+      Assert.assertEquals(11, numCallbacks)
+
+      // Could continue to see delays until the bursty sample disappears
+      for (i <- 0 until 10) {
+        clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+        time.sleep(1000)
+      }
+
+      Assert.assertEquals("Should be unthrottled since bursty sample has rolled over",
+                          0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
+    } finally {
+      clientMetrics.shutdown()
+    }
+  }
+
+  @Test
+  def testOverrideParse() {
+    var testConfig = ClientQuotaManagerConfig()
+    var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+
+    try {
+      // Case 1 - Default config
+      Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true),
+                          clientMetrics.quota("p1"))
+    } finally {
+      clientMetrics.shutdown()
+    }
+
+
+    // Case 2 - Empty override
+    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+                                          quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,")
+
+    clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+    try {
+      Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1"))
+      Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2"))
+    } finally {
+      clientMetrics.shutdown()
+    }
+
+    // Case 3 - NumberFormatException for override
+    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+                                          quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4")
+    try {
+      clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+      Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
+    }
+    catch {
+      // Swallow.
+      case nfe: NumberFormatException =>
+    }
+
+    // Case 4 - IllegalArgumentException for override
+    testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+                                          quotaBytesPerSecondOverrides = "p1=2000=3000")
+    try {
+      clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time)
+      Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
+    }
+    catch {
+      // Swallow.
+      case nfe: IllegalArgumentException =>
+    }
+
+  }
+
+  def newMetrics: Metrics = {
+    new Metrics(new MetricConfig(), Collections.emptyList(), time)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e26a730..9688b8c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -474,6 +474,12 @@ class KafkaConfigTest {
         case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+        case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string
+        case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string
+        case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
 
         case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
new file mode 100644
index 0000000..14a7f45
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.Collections
+import java.util.concurrent.{TimeUnit, DelayQueue}
+
+import org.apache.kafka.common.metrics.MetricConfig
+import org.apache.kafka.common.utils.MockTime
+import org.junit.{AfterClass, Before, Assert, Test}
+import org.scalatest.junit.JUnit3Suite
+
+class ThrottledResponseExpirationTest extends JUnit3Suite {
+  private val time = new MockTime
+  private var numCallbacks: Int = 0
+  private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
+                                                                    Collections.emptyList(),
+                                                                    time)
+
+  def callback {
+    numCallbacks += 1
+  }
+
+  @Before
+  def beforeMethod() {
+    numCallbacks = 0
+  }
+
+  @Test
+  def testExpire() {
+    val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer", time)
+
+    val delayQueue = new DelayQueue[ThrottledResponse]()
+    val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)
+    try {
+      // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
+      delayQueue.add(new ThrottledResponse(time, 10, callback))
+      delayQueue.add(new ThrottledResponse(time, 30, callback))
+      delayQueue.add(new ThrottledResponse(time, 30, callback))
+      delayQueue.add(new ThrottledResponse(time, 20, callback))
+
+      for(itr <- 1 to 3) {
+        time.sleep(10)
+        reaper.doWork()
+        Assert.assertEquals(itr, numCallbacks)
+
+      }
+      reaper.doWork()
+      Assert.assertEquals(4, numCallbacks)
+      Assert.assertEquals(0, delayQueue.size())
+      reaper.doWork()
+      Assert.assertEquals(4, numCallbacks)
+    } finally {
+      clientMetrics.shutdown()
+    }
+  }
+
+  @Test
+  def testThrottledRequest() {
+    val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
+    val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
+    val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
+    Assert.assertEquals(10, t1.delayTimeMs)
+    Assert.assertEquals(20, t2.delayTimeMs)
+    Assert.assertEquals(20, t3.delayTimeMs)
+
+    for(itr <- 0 to 2) {
+      Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))
+      Assert.assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS))
+      Assert.assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS))
+      time.sleep(10)
+    }
+  }
+}