You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/01 16:13:39 UTC

[2/4] kafka git commit: KAFKA-4954; Request handler utilization quotas

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b9bf3e4..b2a3456 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -419,6 +419,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
+    true,
     ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
 
   override def run() {
@@ -457,7 +458,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.NoOpAction =>
             // There is no response to send to the client, we need to read more pipelined requests
             // that are sitting in the server's socket buffer
-            curr.request.updateRequestMetrics
+            updateRequestMetrics(curr.request)
             trace("Socket server received empty response to send, registering for read: " + curr)
             val channelId = curr.request.connectionId
             if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
@@ -465,7 +466,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.SendAction =>
             sendResponse(curr)
           case RequestChannel.CloseConnectionAction =>
-            curr.request.updateRequestMetrics
+            updateRequestMetrics(curr.request)
             trace("Closing socket connection actively according to the response code.")
             close(selector, curr.request.connectionId)
         }
@@ -482,7 +483,7 @@ private[kafka] class Processor(val id: Int,
     // `channel` can be null if the selector closed the connection because it was idle for too long
     if (channel == null) {
       warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
-      response.request.updateRequestMetrics()
+      response.request.updateRequestMetrics(0L)
     }
     else {
       selector.send(response.responseSend)
@@ -505,14 +506,13 @@ private[kafka] class Processor(val id: Int,
     selector.completedReceives.asScala.foreach { receive =>
       try {
         val openChannel = selector.channel(receive.source)
-        val session = {
-          // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
-          val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
-          RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
-        }
+        // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
+        val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
+
         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
-          buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
-          securityProtocol = securityProtocol)
+          buffer = receive.payload, startTimeNanos = time.nanoseconds,
+          listenerName = listenerName, securityProtocol = securityProtocol)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {
@@ -529,17 +529,24 @@ private[kafka] class Processor(val id: Int,
       val resp = inflightResponses.remove(send.destination).getOrElse {
         throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
       }
-      resp.request.updateRequestMetrics()
+      updateRequestMetrics(resp.request)
       selector.unmute(send.destination)
     }
   }
 
+  private def updateRequestMetrics(request: RequestChannel.Request) {
+    val channel = selector.channel(request.connectionId)
+    val openOrClosingChannel = if (channel != null) channel else selector.closingChannel(request.connectionId)
+    val networkThreadTimeNanos = if (openOrClosingChannel != null) openOrClosingChannel.getAndResetNetworkThreadTimeNanos() else 0L
+    request.updateRequestMetrics(networkThreadTimeNanos)
+  }
+
   private def processDisconnected() {
     selector.disconnected.asScala.foreach { connectionId =>
       val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
         throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
       }.remoteHost
-      inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
+      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
       // the channel has been closed by the selector but the quotas still need to be updated
       connectionQuotas.dec(InetAddress.getByName(remoteHost))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 84772db..04f5239 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -31,10 +31,11 @@ import scala.collection.JavaConverters._
 
 /**
  * Represents the sensors aggregated per client
+ * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
  * @param quotaSensor @Sensor that tracks the quota
  * @param throttleTimeSensor @Sensor that tracks the throttle time
  */
-private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
 
 /**
  * Configuration settings for quota management
@@ -58,6 +59,8 @@ object ClientQuotaManagerConfig {
   val DefaultQuotaWindowSizeSeconds = 1
   // Purge sensors after 1 hour of inactivity
   val InactiveSensorExpirationTimeSeconds  = 3600
+  val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+  val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
 
   val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
   val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
@@ -126,12 +129,12 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
  *
  * @param config @ClientQuotaManagerConfig quota configs
  * @param metrics @Metrics Metrics instance
- * @param apiKey API Key for the request
+ * @param quotaType Quota type of this quota manager
  * @param time @Time object to use
  */
-final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
-                         private val apiKey: QuotaType,
+                         private val quotaType: QuotaType,
                          private val time: Time) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@@ -140,19 +143,22 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess
   val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
-  throttledRequestReaper.start()
 
-  private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+  private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
-                                      apiKey.toString,
+                                      quotaType.toString,
                                       "Tracks the size of the delay queue"), new Total())
+  start() // Use start method to keep findbugs happy
+  private def start() {
+    throttledRequestReaper.start()
+  }
 
   /**
    * Reaper thread that triggers callbacks on all throttled requests
    * @param delayQueue DelayQueue to dequeue from
    */
   class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
-    "ThrottledRequestReaper-%s".format(apiKey), false) {
+    "ThrottledRequestReaper-%s".format(quotaType), false) {
 
     override def doWork(): Unit = {
       val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -166,17 +172,23 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
-   * @param clientId clientId that produced the data
-   * @param value amount of data written in bytes
+   * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
+   * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
+   * Throttle time calculation may be overridden by sub-classes.
+   * @param sanitizedUser user principal of client
+   * @param clientId clientId that produced/fetched the data
+   * @param value amount of data in bytes or request processing time as a percentage
    * @param callback Callback function. This will be triggered immediately if quota is not violated.
    *                 If there is a quota violation, this callback will be triggered after a delay
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = {
-    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
-    val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
+  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+    val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+    recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+  }
+
+  def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
     var throttleTimeMs = 0
     try {
       clientSensors.quotaSensor.record(value)
@@ -185,8 +197,9 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     } catch {
       case _: QuotaViolationException =>
         // Compute the delay
+        val clientQuotaEntity = clientSensors.quotaEntity
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
+        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -197,6 +210,15 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
+   * Records that a user/clientId changed some metric being throttled without checking for
+   * quota violation. The aggregate value will subsequently be used for throttling when the
+   * next request is processed.
+   */
+  def recordNoThrottle(clientSensors: ClientSensors, value: Double) {
+    clientSensors.quotaSensor.record(value, time.milliseconds(), false)
+  }
+
+  /**
    * Determines the quota-id for the client with the specified user principal
    * and client-id and returns the quota entity that encapsulates the quota-id
    * and the associated quota override or default quota.
@@ -325,13 +347,13 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
+  protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
     val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
     val quota = config.quota()
     val difference = clientMetric.value() - quota.bound
     // Use the precise window used by the rate calculation
     val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
-    throttleTimeMs.round.toInt
+    throttleTimeMs.round
   }
 
   // Casting to Rate because we only use Rate in Quota computation
@@ -346,39 +368,54 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * This function either returns the sensors for a given client id or creates them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
-  private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {
+  def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
+    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
     // Names of the sensors to access
     ClientSensors(
+      clientQuotaEntity,
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(quotaEntity.quotaId),
+        getQuotaSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock, metrics,
-        () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId),
-        () => getQuotaMetricConfig(quotaEntity.quota),
-        () => new Rate()
+        () => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
+        () => getQuotaMetricConfig(clientQuotaEntity.quota),
+        () => measurableStat
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock,
         metrics,
-        () => throttleMetricName(quotaEntity),
+        () => throttleMetricName(clientQuotaEntity),
         () => null,
         () => new Avg()
       )
     )
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def measurableStat: MeasurableStat = new Rate()
+
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
-  private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+  protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
             .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
             .samples(config.numQuotaSamples)
             .quota(quota)
   }
 
+  protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
+    sensorAccessor.getOrCreate(
+        sensorName,
+        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+        lock, metrics,
+        () => metricName,
+        () => null,
+        () => measurableStat
+      )
+  }
+
   /**
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
    * for any of these levels.
@@ -409,7 +446,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       }
       quota match {
         case Some(newQuota) =>
-          logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
+          logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
           (sanitizedUser, clientId) match {
             case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -418,7 +455,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             case (None, None) =>
           }
         case None =>
-          logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}")
+          logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
           overriddenQuota.remove(quotaId)
       }
 
@@ -460,8 +497,8 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
-  private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
-    metrics.metricName("byte-rate", apiKey.toString,
+  protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+    metrics.metricName("byte-rate", quotaType.toString,
                    "Tracking byte-rate per user/client-id",
                    "user", sanitizedUser,
                    "client-id", clientId)
@@ -469,7 +506,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
 
   private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
     metrics.metricName("throttle-time",
-                       apiKey.toString,
+                       quotaType.toString,
                        "Tracking average throttle-time per user/client-id",
                        "user", quotaEntity.sanitizedUser,
                        "client-id", quotaEntity.clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
new file mode 100644
index 0000000..7e80be6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.utils.Time
+
+
+class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
+                         private val metrics: Metrics,
+                         private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+  val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+  val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
+
+  def recordExempt(value: Double) {
+    exemptSensor.record(value)
+  }
+
+  override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+    math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+  }
+
+  override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+    metrics.metricName("request-time", QuotaType.Request.toString,
+                   "Tracking request-time per user/client-id",
+                   "user", sanitizedUser,
+                   "client-id", clientId)
+  }
+
+  private def exemptMetricName: MetricName = {
+    metrics.metricName("exempt-request-time", QuotaType.Request.toString,
+                   "Tracking exempt-request-time utilization percentage")
+  }
+
+  private def exemptSensorName: String = "exempt-" + QuotaType.Request
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 8d6de8c..2483199 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -130,6 +130,12 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
       else
         None
     quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+    val requestQuota =
+      if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
+        Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true))
+      else
+        None
+    quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e68f921..175bf62 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -63,19 +63,23 @@ object DynamicConfig {
     //Properties
     val ProducerByteRateOverrideProp = "producer_byte_rate"
     val ConsumerByteRateOverrideProp = "consumer_byte_rate"
+    val RequestPercentageOverrideProp = "request_percentage"
 
     //Defaults
     val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
     val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+    val DefaultRequestOverride = ClientQuotaManagerConfig.QuotaRequestPercentDefault
 
     //Documentation
     val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
     val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer traffic."
+    val RequestOverrideDoc = "A percentage representing the upper bound of time spent for processing requests."
 
     //Definitions
     private val clientConfigs = new ConfigDef()
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
+      .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
 
     def names = clientConfigs.names
 
@@ -88,6 +92,7 @@ object DynamicConfig {
     private val userConfigs = CredentialProvider.userCredentialConfigs
       .define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
       .define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
+      .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
 
     def names = userConfigs.names
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59f062d..1e1f0d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -118,24 +118,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: FatalExitError => throw e
-      case e: Throwable =>
-        if (request.requestObj != null) {
-          request.requestObj.handleError(e, requestChannel, request)
-          error("Error when handling request %s".format(request.requestObj), e)
-        } else {
-          val response = request.body[AbstractRequest].getErrorResponse(e)
-
-          /* If request doesn't have a default error response, we just close the connection.
-             For example, when produce request has acks set to 0 */
-          if (response == null)
-            requestChannel.closeConnection(request.processor, request)
-          else
-            requestChannel.sendResponse(new Response(request, response))
-
-          error("Error when handling request %s".format(request.body[AbstractRequest]), e)
-        }
-    } finally
-      request.apiLocalCompleteTimeMs = time.milliseconds
+      case e: Throwable => handleError(request, e)
+    } finally {
+      request.apiLocalCompleteTimeNanos = time.nanoseconds
+    }
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -165,16 +151,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val leaderAndIsrResponse =
-        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
-          new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
-        } else {
-          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
-        }
-
-      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
+      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+        val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+        val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
+        sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+      } else {
+        val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+        def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+        sendResponseMaybeThrottle(request, createResponse)
+      }
     } catch {
       case e: FatalExitError => throw e
       case e: KafkaStorageException =>
@@ -189,27 +174,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
 
-    val response =
-      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
-        // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
-        // since this broker is no longer a replica for that offsets topic partition.
-        // This is required to handle the following scenario :
-        // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
-        // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
-        // is not cleared.
-        result.foreach { case (topicPartition, error) =>
-          if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
-            groupCoordinator.handleGroupEmigration(topicPartition.partition)
-          }
+    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+      val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+      // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+      // since this broker is no longer a replica for that offsets topic partition.
+      // This is required to handle the following scenario :
+      // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+      // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+      // is not cleared.
+      result.foreach { case (topicPartition, error) =>
+        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
+          groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
-        new StopReplicaResponse(error, result.asJava)
-      } else {
-        val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
       }
+      val response = new StopReplicaResponse(error, result.asJava)
+      sendResponseExemptThrottle(request, new Response(request, response))
+    } else {
+      val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
+    }
 
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -217,23 +202,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    val updateMetadataResponse =
-      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
-        if (deletedPartitions.nonEmpty)
-          groupCoordinator.handleDeletedPartitions(deletedPartitions)
+    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+      val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
+      if (deletedPartitions.nonEmpty)
+        groupCoordinator.handleDeletedPartitions(deletedPartitions)
 
-        if (adminManager.hasDelayedTopicOperations) {
-          updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
-            adminManager.tryCompleteDelayedTopicOperations(topic)
-          }
+      if (adminManager.hasDelayedTopicOperations) {
+        updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
+        adminManager.tryCompleteDelayedTopicOperations(topic)
         }
-        new UpdateMetadataResponse(Errors.NONE)
-      } else {
-        new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
       }
-
-    requestChannel.sendResponse(new Response(request, updateMetadataResponse))
+      sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+    } else {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
+    }
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -249,9 +232,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case Success(partitionsRemaining) =>
           val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
             Errors.NONE, partitionsRemaining)
-          requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+          sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
         case Failure(throwable) =>
-          controlledShutdownRequest.handleError(throwable, requestChannel, request)
+          sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
       }
     }
     controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@@ -270,8 +253,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
       }.toMap
-      val response = new OffsetCommitResponse(results.asJava)
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
         case (topicPartition, _) =>
@@ -300,8 +283,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
-        val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
-        requestChannel.sendResponse(new RequestChannel.Response(request, response))
+        def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
+        sendResponseMaybeThrottle(request, createResponse)
       }
 
       if (authorizedTopics.isEmpty)
@@ -407,7 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      def produceResponseCallback(delayTimeMs: Int) {
+      def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
         if (produceRequest.acks == 0) {
           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
           // the request, since no response is expected by the producer, the server will close socket server so that
@@ -426,13 +409,15 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.noOperation(request.processor, request)
           }
         } else {
-          val respBody = new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
-          requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+          def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
+            new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+          }
+          sendResponseMaybeThrottle(request, createResponseCallback)
         }
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       quotas.produce.recordAndMaybeThrottle(
         request.session.sanitizedUser,
@@ -534,20 +519,29 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = new FetchResponse(fetchedPartitionData, 0)
       val responseStruct = response.toStruct(versionId)
 
-      def fetchResponseCallback(throttleTimeMs: Int) {
-        trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
-        val responseSend = response.toSend(responseStruct, throttleTimeMs, request.connectionId, request.header)
-        requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+      def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
+        def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
+          trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
+          new RequestChannel.Response(request, responseSend)
+        }
+        def sendResponseCallback(requestThrottleTimeMs: Int) {
+          requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+        }
+        if (fetchRequest.isFromFollower)
+          sendResponseExemptThrottle(request, createResponse(0))
+        else
+          sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
         val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
         quotas.leader.record(responseSize)
-        fetchResponseCallback(throttleTimeMs = 0)
+        fetchResponseCallback(bandwidthThrottleTimeMs = 0)
       } else {
         quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
@@ -597,8 +591,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       else
         handleListOffsetRequestV1(request)
 
-    val response = new ListOffsetResponse(mergedResponseMap.asJava)
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -925,13 +919,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, request.header.clientId))
 
-    val responseBody = new MetadataResponse(
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
+      throttleTimeMs,
       brokers.map(_.getNode(request.listenerName)).asJava,
       clusterId,
       metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
       completeTopicMetadata.asJava
     )
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   /**
@@ -944,68 +940,70 @@ class KafkaApis(val requestChannel: RequestChannel,
     def authorizeTopicDescribe(partition: TopicPartition) =
       authorize(request.session, Describe, new Resource(Topic, partition.topic))
 
-    val offsetFetchResponse =
-      // reject the request if not authorized to the group
-      if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
-        offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      else {
-        if (header.apiVersion == 0) {
-          val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
-            .partition(authorizeTopicDescribe)
+    def createResponse(throttleTimeMs: Int): AbstractResponse = {
+      val offsetFetchResponse =
+        // reject the request if not authorized to the group
+        if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
+          offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+        else {
+          if (header.apiVersion == 0) {
+            val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+              .partition(authorizeTopicDescribe)
 
-          // version 0 reads offsets from ZK
-          val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
-            val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
-            try {
-              if (!metadataCache.contains(topicPartition.topic))
-                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-              else {
-                val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
-                payloadOpt match {
-                  case Some(payload) =>
-                    (topicPartition, new OffsetFetchResponse.PartitionData(
-                        payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
-                  case None =>
-                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+            // version 0 reads offsets from ZK
+            val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
+              val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+              try {
+                if (!metadataCache.contains(topicPartition.topic))
+                  (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+                else {
+                  val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+                  payloadOpt match {
+                    case Some(payload) =>
+                      (topicPartition, new OffsetFetchResponse.PartitionData(
+                          payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
+                    case None =>
+                      (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+                  }
                 }
+              } catch {
+                case e: Throwable =>
+                  (topicPartition, new OffsetFetchResponse.PartitionData(
+                      OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
               }
-            } catch {
-              case e: Throwable =>
-                (topicPartition, new OffsetFetchResponse.PartitionData(
-                    OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
-            }
-          }.toMap
+            }.toMap
 
-          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-          new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
-        } else {
-          // versions 1 and above read offsets from Kafka
-          if (offsetFetchRequest.isAllPartitions) {
-            val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
-            if (error != Errors.NONE)
-              offsetFetchRequest.getErrorResponse(error)
-            else {
-              // clients are not allowed to see offsets for topics that are not authorized for Describe
-              val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
-              new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava)
-            }
+            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+            new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
           } else {
-            val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
-              .partition(authorizeTopicDescribe)
-            val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
-              Some(authorizedPartitions))
-            if (error != Errors.NONE)
-              offsetFetchRequest.getErrorResponse(error)
-            else {
-              val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-              new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+            // versions 1 and above read offsets from Kafka
+            if (offsetFetchRequest.isAllPartitions) {
+              val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+              if (error != Errors.NONE)
+                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+              else {
+                // clients are not allowed to see offsets for topics that are not authorized for Describe
+                val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
+                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+              }
+            } else {
+              val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+                .partition(authorizeTopicDescribe)
+              val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+                Some(authorizedPartitions))
+              if (error != Errors.NONE)
+                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+              else {
+                val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+              }
             }
           }
         }
+        trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+        offsetFetchResponse
       }
-
-    trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-    requestChannel.sendResponse(new Response(request, offsetFetchResponse))
+      sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request) {
@@ -1014,8 +1012,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
       !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
 
-      val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // TODO: Authorize by transactional id if coordinator type is TRANSACTION
 
@@ -1035,24 +1033,26 @@ class KafkaApis(val requestChannel: RequestChannel,
           throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
       }
 
-      val responseBody = if (topicMetadata.error != Errors.NONE) {
-        new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-      } else {
-        val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
-          .find(_.partition == partition)
-          .map(_.leader())
-
-        coordinatorEndpoint match {
-          case Some(endpoint) if !endpoint.isEmpty =>
-            new FindCoordinatorResponse(Errors.NONE, endpoint)
-          case _ =>
-            new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = if (topicMetadata.error != Errors.NONE) {
+          new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        } else {
+          val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
+            .find(_.partition == partition)
+            .map(_.leader())
+
+          coordinatorEndpoint match {
+            case Some(endpoint) if !endpoint.isEmpty =>
+              new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+            case _ =>
+              new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+          }
         }
+        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+          .format(responseBody, request.header.correlationId, request.header.clientId))
+        responseBody
       }
-
-      trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-        .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      sendResponseMaybeThrottle(request, createResponse)
     }
   }
 
@@ -1074,19 +1074,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
     }.toMap
 
-    val responseBody = new DescribeGroupsResponse(groups.asJava)
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
-      ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+    if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
-      new ListGroupsResponse(error, allGroups.asJava)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     }
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -1095,23 +1096,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
-      val responseBody = new JoinGroupResponse(joinResult.error, joinResult.generationId,
-        joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+          joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
 
-      trace("Sending join group response %s for correlation id %d to client %s."
-        .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+        trace("Sending join group response %s for correlation id %d to client %s."
+          .format(responseBody, request.header.correlationId, request.header.clientId))
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
-      val responseBody = new JoinGroupResponse(
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
+        throttleTimeMs,
         Errors.GROUP_AUTHORIZATION_FAILED,
         JoinGroupResponse.UNKNOWN_GENERATION_ID,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
         Collections.emptyMap())
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1133,8 +1138,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val syncGroupRequest = request.body[SyncGroupRequest]
 
     def sendResponseCallback(memberState: Array[Byte], error: Errors) {
-      val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState))
-      requestChannel.sendResponse(new Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1155,15 +1160,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(error: Errors) {
-      val response = new HeartbeatResponse(error)
-      trace("Sending heartbeat response %s for correlation id %d to client %s."
-        .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = new HeartbeatResponse(throttleTimeMs, error)
+        trace("Sending heartbeat response %s for correlation id %d to client %s."
+          .format(response, request.header.correlationId, request.header.clientId))
+        response
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      requestChannel.sendResponse(new Response(request, heartbeatResponse))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     }
     else {
       // let the coordinator to handle heartbeat
@@ -1180,15 +1188,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a leave-group response
     def sendResponseCallback(error: Errors) {
-      val response = new LeaveGroupResponse(error)
-      trace("Sending leave group response %s for correlation id %d to client %s."
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = new LeaveGroupResponse(throttleTimeMs, error)
+        trace("Sending leave group response %s for correlation id %d to client %s."
                     .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+        response
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      requestChannel.sendResponse(new Response(request, leaveGroupResponse))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // let the coordinator to handle leave-group
       groupCoordinator.handleLeaveGroup(
@@ -1199,8 +1210,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1210,20 +1221,26 @@ class KafkaApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    val responseSend =
-      if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
-        ApiVersionsResponse.API_VERSIONS_RESPONSE.toSend(request.connectionId, request.header)
-      else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+    def sendResponseCallback(throttleTimeMs: Int) {
+      val responseSend =
+        if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
+          ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+        else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+    }
+    sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
     val createTopicsRequest = request.body[CreateTopicsRequest]
 
     def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
-      val responseBody = new CreateTopicsResponse(results.asJava)
-      trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+        trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!controller.isActive) {
@@ -1279,11 +1296,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
-          unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
-      val responseBody = new DeleteTopicsResponse(completeResults.asJava)
-      trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+            unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+        val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+        trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!controller.isActive) {
@@ -1335,11 +1355,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
-      requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
-
-      // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (authorizedForDeleteTopics.isEmpty)
@@ -1359,9 +1376,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(result: InitPidResult): Unit = {
-      val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
-      trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
+        trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
     txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
   }
@@ -1370,9 +1390,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val endTxnRequest = request.body[EndTxnRequest]
 
     def sendResponseCallback(error: Errors) {
-      val responseBody = new EndTxnResponse(error)
-      trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new EndTxnResponse(throttleTimeMs, error)
+        trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(),
@@ -1383,11 +1406,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+    authorizeClusterAction(request)
     val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
-    requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
+    val responseBody = new WriteTxnMarkersResponse(emptyResponse)
+    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
   }
 
-
   def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
@@ -1395,9 +1419,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(error: Errors): Unit = {
-      val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(error)
-      trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error)
+        trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1415,9 +1442,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(error: Errors): Unit = {
-      val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(error)
-      trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+        trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1429,7 +1459,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
     val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
-    requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, emptyResponse)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
@@ -1440,11 +1471,93 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseBody = new OffsetsForLeaderEpochResponse(
       replicaManager.getResponseFor(requestInfo)
     )
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+  }
+
+  private def handleError(request: RequestChannel.Request, e: Throwable) {
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
+    if (request.requestObj != null) {
+      def sendResponseCallback(throttleTimeMs: Int) {
+        request.requestObj.handleError(e, requestChannel, request)
+        error("Error when handling request %s".format(request.requestObj), e)
+      }
+
+      if (mayThrottle) {
+        val clientId : String =
+          if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
+            request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
+          else
+            throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
+        sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
+      } else
+        sendResponseExemptThrottle(request, () => sendResponseCallback(0))
+    } else {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
+
+        /* If request doesn't have a default error response, we just close the connection.
+           For example, when produce request has acks set to 0 */
+        if (response == null)
+          requestChannel.closeConnection(request.processor, request)
+        response
+      }
+      error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+      if (mayThrottle)
+        sendResponseMaybeThrottle(request, createResponse)
+      else
+        sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+    }
   }
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
+
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
+    def sendResponseCallback(throttleTimeMs: Int) {
+      val response = createResponse(throttleTimeMs)
+      if (response != null)
+        sendResponse(request, response)
+    }
+    sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+  }
+
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
+
+    if (request.apiRemoteCompleteTimeNanos == -1) {
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
+    }
+    val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser, clientId)
+    def recordNetworkThreadTimeNanos(timeNanos: Long) {
+      quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
+    }
+    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+    quotas.request.recordAndThrottleOnQuotaViolation(
+        quotaSensors,
+        nanosToPercentage(request.requestThreadTimeNanos),
+        sendResponseCallback)
+  }
+
+  private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
+    sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+  }
+
+  private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
+    def recordNetworkThreadTimeNanos(timeNanos: Long) {
+      quotas.request.recordExempt(nanosToPercentage(timeNanos))
+    }
+    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+    quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
+    sendResponseCallback()
+  }
+
+  private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
+    requestChannel.sendResponse(new Response(request, response))
+  }
+
+  private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index c9c31ad..a1600cb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -50,7 +50,10 @@ class KafkaRequestHandler(id: Int,
           // time should be discounted by # threads.
           val startSelectTime = time.nanoseconds
           req = requestChannel.receiveRequest(300)
-          val idleTime = time.nanoseconds - startSelectTime
+          val endTime = time.nanoseconds
+          if (req != null)
+            req.requestDequeueTimeNanos = endTime
+          val idleTime = endTime - startSelectTime
           aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
         }
 
@@ -59,7 +62,6 @@ class KafkaRequestHandler(id: Int,
           latch.countDown()
           return
         }
-        req.requestDequeueTimeMs = time.milliseconds
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 671ad63..dee39a3 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time
 object QuotaType  {
   case object Fetch extends QuotaType
   case object Produce extends QuotaType
+  case object Request extends QuotaType
   case object LeaderReplication extends QuotaType
   case object FollowerReplication extends QuotaType
 }
@@ -36,10 +37,11 @@ object QuotaFactory {
     override def isQuotaExceeded(): Boolean = false
   }
 
-  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
     def shutdown() {
       fetch.shutdown
       produce.shutdown
+      request.shutdown
     }
   }
 
@@ -47,6 +49,7 @@ object QuotaFactory {
     QuotaManagers(
       new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
       new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
     )
@@ -66,6 +69,13 @@ object QuotaFactory {
       quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
     )
 
+  def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
+    ClientQuotaManagerConfig(
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+  }
+
   def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
     ReplicationQuotaManagerConfig(
       numQuotaSamples = cfg.numReplicationQuotaSamples,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index d9822cd..ff9fef0 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -206,7 +206,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
       val hostStr = s"${node.host}:${node.port}"
       assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
       val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index aa1717a..f21c1df 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -26,13 +26,15 @@ import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import kafka.server.QuotaType
+import org.apache.kafka.common.metrics.KafkaMetric
 
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   def userPrincipal : String
   def producerQuotaId : QuotaId
   def consumerQuotaId : QuotaId
-  def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
   def removeQuotaOverrides()
 
   override val serverCount = 2
@@ -55,10 +57,13 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "0")
+  this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
 
   // Low enough quota that a producer sending a small payload in a tight loop should get throttled
   val defaultProducerQuota = 8000
   val defaultConsumerQuota = 2500
+  val defaultRequestQuota = Int.MaxValue
 
   var leaderNode: KafkaServer = null
   var followerNode: KafkaServer = null
@@ -99,8 +104,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
     assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -114,8 +119,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   @Test
   def testQuotaOverrideDelete() {
     // Override producer and consumer quotas to unlimited
-    overrideQuotas(Long.MaxValue, Long.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
     assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -136,6 +141,28 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
   }
 
+  @Test
+  def testThrottledRequest() {
+
+    overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+
+    val consumer = consumers.head
+    consumer.subscribe(Collections.singleton(topic1))
+    val endTimeMs = System.currentTimeMillis + 10000
+    var throttled = false
+    while (!throttled && System.currentTimeMillis < endTimeMs) {
+      consumer.poll(100)
+      val throttleMetric = consumerRequestThrottleMetric
+      throttled = throttleMetric != null && throttleMetric.value > 0
+    }
+
+    assertTrue("Should have been throttled", throttled)
+
+    assertNotNull("Exempt requests not recorded", exemptRequestMetric)
+    assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+  }
+
   def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
     var numProduced = 0
     var throttled = false
@@ -169,31 +196,47 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     numConsumed
   }
 
-  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     TestUtils.retry(10000) {
       val quotaManagers = leaderNode.apis.quotas
       val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
       val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
+      val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
+      val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
 
       assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
       assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
     }
   }
 
-  private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = {
+  private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
     leaderNode.metrics.metricName("throttle-time",
-                                  apiKey.name,
+                                  quotaType.toString,
                                   "Tracking throttle-time per user/client-id",
                                   "user", quotaId.sanitizedUser.getOrElse(""),
                                   "client-id", quotaId.clientId.getOrElse(""))
   }
-  private def producerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId))
-  private def consumerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.FETCH, consumerQuotaId))
 
-  def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+  def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
+    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
+  }
+
+  private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
+  private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
+  private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
+
+  private def exemptRequestMetric: KafkaMetric = {
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+    leaderNode.metrics.metrics.get(metricName)
+  }
+
+  def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
     val props = new Properties()
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     props
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index d71713f..f8594e1 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -33,14 +33,15 @@ class ClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
     super.setUp()
   }
-
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     val producerProps = new Properties()
     producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+    producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(producerClientId, producerProps)
 
     val consumerProps = new Properties()
     consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(consumerClientId, consumerProps)
   }
   override def removeQuotaOverrides() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 82b109d..333c851 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -39,18 +39,20 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     val producerProps = new Properties()
     producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+    producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(userPrincipal, producerClientId, producerProps)
 
     val consumerProps = new Properties()
     consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
   }