You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/01 16:13:38 UTC

[1/4] kafka git commit: KAFKA-4954; Request handler utilization quotas

Repository: kafka
Updated Branches:
  refs/heads/trunk 6185bc027 -> 0104b657a


http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index c8d0a77..d0e514c 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -41,13 +41,13 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
-    val props = quotaProperties(producerQuota, consumerQuota)
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+    val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
     updateQuotaOverride(props)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 248b91e..c52020c 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -51,12 +51,12 @@ class ApiVersionsRequestTest extends BaseRequestTest {
   @Test
   def testApiVersionsRequestWithUnsupportedVersion() {
     val apiVersionsRequest = new ApiVersionsRequest(0)
-    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, Some(Short.MaxValue))
+    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, Some(Short.MaxValue), 0)
     assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error)
   }
 
-  private def sendApiVersionsRequest(request: ApiVersionsRequest, apiVersion: Option[Short] = None): ApiVersionsResponse = {
+  private def sendApiVersionsRequest(request: ApiVersionsRequest, apiVersion: Option[Short] = None, responseVersion: Short = 1): ApiVersionsResponse = {
     val response = connectAndSend(request, ApiKeys.API_VERSIONS, apiVersion = apiVersion)
-    ApiVersionsResponse.parse(response, 0)
+    ApiVersionsResponse.parse(response, responseVersion)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index a26bc2e..a7a12eb 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -146,7 +146,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     skipResponseHeader(response)
   }
 
-  private def skipResponseHeader(response: Array[Byte]): ByteBuffer = {
+  protected def skipResponseHeader(response: Array[Byte]): ByteBuffer = {
     val responseBuffer = ByteBuffer.wrap(response)
     // Parse the header to ensure its valid and move the buffer forward
     ResponseHeader.parse(responseBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index b581341..183ba0b 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -269,6 +269,68 @@ class ClientQuotaManagerTest {
   }
 
   @Test
+  def testRequestPercentageQuotaViolation() {
+    val metrics = newMetrics
+    val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
+    quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some(Quota.upperBound(1)))
+    val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
+    def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
+    try {
+      /* We have 10 second windows. Make sure that there is no quota violation
+       * if we are under the quota
+       */
+      for (_ <- 0 until 10) {
+        quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        time.sleep(1000)
+      }
+      assertEquals(10, numCallbacks)
+      assertEquals(0, queueSizeMetric.value().toInt)
+
+      // Create a spike.
+      // quota = 1% (10ms per second)
+      // 4*10 + 67.1 = 107.1/10.5 = 10.2ms per second.
+      // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
+      // 10.5 seconds interval because the last window is half complete
+      time.sleep(500)
+      val throttleTime = quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
+
+      assertEquals("Should be throttled", 210, throttleTime)
+      assertEquals(1, queueSizeMetric.value().toInt)
+      // After a request is delayed, the callback cannot be triggered immediately
+      quotaManager.throttledRequestReaper.doWork()
+      assertEquals(10, numCallbacks)
+      time.sleep(throttleTime)
+
+      // Callback can only be triggered after the delay time passes
+      quotaManager.throttledRequestReaper.doWork()
+      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(11, numCallbacks)
+
+      // Could continue to see delays until the bursty sample disappears
+      for (_ <- 0 until 11) {
+        quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        time.sleep(1000)
+      }
+
+      assertEquals("Should be unthrottled since bursty sample has rolled over",
+                   0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+
+      // Create a very large spike which requires > one quota window to bring within quota
+      assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
+      for (_ <- 0 until 10) {
+        time.sleep(1000)
+        assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+      }
+      time.sleep(1000)
+      assertEquals("Should be unthrottled since bursty sample has rolled over",
+                   0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
   def testExpireThrottleTimeSensor() {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
new file mode 100644
index 0000000..bca7bb0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -0,0 +1,420 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.server
+
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.{LinkedHashMap, Properties}
+import java.util.concurrent.{Executors, Future, TimeUnit}
+import kafka.admin.AdminUtils
+import kafka.network.RequestChannel.Session
+import kafka.security.auth._
+import kafka.utils.TestUtils
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
+import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal}
+import org.junit.Assert._
+import org.junit.{Before, After, Test}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+class RequestQuotaTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+
+  private val topic = "topic-1"
+  private val numPartitions = 1
+  private val tp = new TopicPartition(topic, 0)
+  private val unthrottledClientId = "unthrottled-client"
+  private val brokerId: Integer = 0
+  private var leaderNode: KafkaServer = null
+
+  // Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low
+  case class Task(val apiKey: ApiKeys, val future: Future[_])
+  private val executor = Executors.newCachedThreadPool
+  private val tasks = new ListBuffer[Task]
+
+  override def propertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp() {
+
+    RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
+    super.setUp()
+
+    TestUtils.createTopic(zkUtils, topic, numPartitions, 1, servers)
+    leaderNode = servers.head
+
+    // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
+    val quotaProps = new Properties()
+    quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
+    AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+    quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
+    AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps)
+
+    TestUtils.retry(10000) {
+      val quotaManager = servers(0).apis.quotas.request
+      assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
+      assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId))
+    }
+  }
+
+  @After
+  override def tearDown() {
+    try {
+      executor.shutdownNow()
+    } finally {
+      super.tearDown()
+    }
+  }
+
+  @Test
+  def testResponseThrottleTime() {
+
+    for (apiKey <- RequestQuotaTest.ClientActions) {
+      val builder = requestBuilder(apiKey)
+      submitTest(apiKey, () => {
+        checkRequestThrottleTime(apiKey)
+      })
+    }
+    waitAndCheckResults()
+  }
+
+  @Test
+  def testUnthrottledClient() {
+
+    for (apiKey <- RequestQuotaTest.ClientActions) {
+      val builder = requestBuilder(apiKey)
+      submitTest(apiKey, () => {
+        checkUnthrottledClient(apiKey)
+      })
+    }
+    waitAndCheckResults()
+  }
+
+  @Test
+  def testExemptRequestTime() {
+
+    for (apiKey <- RequestQuotaTest.ClusterActions) {
+      submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
+    }
+    waitAndCheckResults()
+  }
+
+  @Test
+  def testUnauthorizedThrottle() {
+
+    RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
+
+    for (apiKey <- ApiKeys.values) {
+      submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
+    }
+    waitAndCheckResults()
+  }
+
+  private def throttleTimeMetricValue(clientId: String): Double = {
+    val metricName = leaderNode.metrics.metricName("throttle-time",
+                                  QuotaType.Request.toString,
+                                  "",
+                                  "user", "",
+                                  "client-id", clientId)
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).throttleTimeSensor
+    metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
+  }
+
+  private def requestTimeMetricValue(clientId: String): Double = {
+    val metricName = leaderNode.metrics.metricName("request-time",
+                                  QuotaType.Request.toString,
+                                  "",
+                                  "user", "",
+                                  "client-id", clientId)
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).quotaSensor
+    metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
+  }
+
+  private def exemptRequestMetricValue: Double = {
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+    metricValue(leaderNode.metrics.metrics.get(metricName), leaderNode.quotaManagers.request.exemptSensor)
+  }
+
+  private def metricValue(metric: KafkaMetric, sensor: Sensor): Double = {
+    sensor.synchronized {
+      if (metric == null) -1.0 else metric.value
+    }
+  }
+
+  private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
+    apiKey match {
+        case ApiKeys.PRODUCE =>
+          new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+            collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
+
+        case ApiKeys.FETCH =>
+          val partitionMap = new LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
+          partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
+          requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap)
+
+        case ApiKeys.METADATA =>
+          new requests.MetadataRequest.Builder(List(topic).asJava)
+
+        case ApiKeys.LIST_OFFSETS =>
+          requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
+
+        case ApiKeys.LEADER_AND_ISR =>
+          new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
+            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+            Set(new Node(brokerId, "localhost", 0)).asJava)
+
+        case ApiKeys.STOP_REPLICA =>
+          new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava)
+
+        case ApiKeys.UPDATE_METADATA_KEY =>
+          val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+          val securityProtocol = SecurityProtocol.PLAINTEXT
+          val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
+            Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
+            ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava
+          new requests.UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
+
+        case ApiKeys.CONTROLLED_SHUTDOWN_KEY =>
+          new requests.ControlledShutdownRequest.Builder(brokerId)
+
+        case ApiKeys.OFFSET_COMMIT =>
+          new requests.OffsetCommitRequest.Builder("test-group",
+            Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
+            setMemberId("").setGenerationId(1).setRetentionTime(1000)
+
+        case ApiKeys.OFFSET_FETCH =>
+          new requests.OffsetFetchRequest.Builder("test-group", List(tp).asJava)
+
+        case ApiKeys.FIND_COORDINATOR =>
+          new requests.FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+
+        case ApiKeys.JOIN_GROUP =>
+          new JoinGroupRequest.Builder("test-join-group", 10000, "", "consumer",
+            List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+           .setRebalanceTimeout(60000)
+
+        case ApiKeys.HEARTBEAT =>
+          new HeartbeatRequest.Builder("test-group", 1, "")
+
+        case ApiKeys.LEAVE_GROUP =>
+          new LeaveGroupRequest.Builder("test-leave-group", "")
+
+        case ApiKeys.SYNC_GROUP =>
+          new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
+
+        case ApiKeys.DESCRIBE_GROUPS =>
+          new DescribeGroupsRequest.Builder(List("test-group").asJava)
+
+        case ApiKeys.LIST_GROUPS =>
+          new ListGroupsRequest.Builder()
+
+        case ApiKeys.SASL_HANDSHAKE =>
+          new SaslHandshakeRequest.Builder("PLAIN")
+
+        case ApiKeys.API_VERSIONS =>
+          new ApiVersionsRequest.Builder
+
+        case ApiKeys.CREATE_TOPICS =>
+          new CreateTopicsRequest.Builder(Map("topic-2" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 0)
+
+        case ApiKeys.DELETE_TOPICS =>
+          new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
+
+        case ApiKeys.DELETE_RECORDS =>
+          new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
+
+        case ApiKeys.INIT_PRODUCER_ID =>
+          new InitPidRequest.Builder("abc")
+
+        case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
+          new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
+
+        case ApiKeys.ADD_PARTITIONS_TO_TXN =>
+          new AddPartitionsToTxnRequest.Builder("txn1", 1, 0, List(tp).asJava)
+
+        case ApiKeys.ADD_OFFSETS_TO_TXN =>
+          new AddOffsetsToTxnRequest.Builder("txn1", 1, 0, "test-txn-group")
+
+        case ApiKeys.END_TXN =>
+          new EndTxnRequest.Builder("txn1", 1, 0, TransactionResult.forId(false))
+
+        case ApiKeys.WRITE_TXN_MARKERS =>
+          new WriteTxnMarkersRequest.Builder(0, List.empty.asJava)
+
+        case ApiKeys.TXN_OFFSET_COMMIT =>
+          new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava)
+
+        case key =>
+          throw new IllegalArgumentException("Unsupported API key " + apiKey)
+    }
+  }
+
+  private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
+    val apiKey = requestBuilder.apiKey
+    val request = requestBuilder.build()
+    val header = new RequestHeader(apiKey.id, request.version, clientId, correlationId)
+    val response = requestAndReceive(socket, request.serialize(header).array)
+    val responseBuffer = skipResponseHeader(response)
+    apiKey.parseResponse(request.version, responseBuffer)
+  }
+
+  case class Client(val clientId: String, val apiKey: ApiKeys) {
+    var correlationId: Int = 0
+    val builder = requestBuilder(apiKey)
+    def runUntil(until: (Struct) => Boolean): Boolean = {
+      val startMs = System.currentTimeMillis
+      var done = false
+      val socket = connect()
+      try {
+        while (!done && System.currentTimeMillis < startMs + 10000) {
+          correlationId += 1
+          val response = requestResponse(socket, clientId, correlationId, builder)
+          done = until.apply(response)
+        }
+      } finally {
+        socket.close()
+      }
+      done
+    }
+
+    override def toString: String = {
+      val requestTime = requestTimeMetricValue(clientId)
+      val throttleTime = throttleTimeMetricValue(clientId)
+      s"Client $clientId apiKey ${apiKey} requests $correlationId requestTime $requestTime throttleTime $throttleTime"
+    }
+  }
+
+  private def submitTest(apiKey: ApiKeys, test: () => Unit) {
+    val future = executor.submit(new Runnable() {
+      def run() {
+        test.apply()
+      }
+    })
+    tasks += Task(apiKey, future)
+  }
+
+  private def waitAndCheckResults() {
+    for (task <- tasks) {
+      try {
+        task.future.get(15, TimeUnit.SECONDS)
+      } catch {
+        case e: Throwable => {
+          error(s"Test failed for api-key ${task.apiKey} with exception $e")
+          throw e
+        }
+      }
+    }
+  }
+
+  private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
+    apiKey match {
+      case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime
+      case ApiKeys.FETCH => new requests.FetchResponse(response).throttleTimeMs
+      case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
+      case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_COMMIT => new requests.OffsetCommitResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_FETCH => new requests.OffsetFetchResponse(response).throttleTimeMs
+      case ApiKeys.FIND_COORDINATOR => new requests.FindCoordinatorResponse(response).throttleTimeMs
+      case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
+      case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
+      case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
+      case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
+      case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
+      case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
+      case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
+      case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
+      case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs
+      case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
+      case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
+      case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs
+      case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response).throttleTimeMs
+      case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
+    }
+  }
+
+  private def checkRequestThrottleTime(apiKey: ApiKeys) {
+
+    // Request until throttled using client-id with default small quota
+    val clientId = apiKey.toString
+    val client = Client(clientId, apiKey)
+    val throttled = client.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+    assertTrue(s"Response not throttled: $client", throttled)
+    assertTrue(s"Throttle time metrics not updated: $client" , throttleTimeMetricValue(clientId) > 0)
+  }
+
+  private def checkUnthrottledClient(apiKey: ApiKeys) {
+
+    // Test that request from client with large quota is not throttled
+    val unthrottledClient = Client(unthrottledClientId, apiKey)
+    unthrottledClient.runUntil(response => responseThrottleTime(apiKey, response) <= 0.0)
+    assertEquals(1, unthrottledClient.correlationId)
+    assertTrue(s"Client should not have been throttled: $unthrottledClient", throttleTimeMetricValue(unthrottledClientId) <= 0.0)
+  }
+
+  private def checkExemptRequestMetric(apiKey: ApiKeys) {
+    val exemptTarget = exemptRequestMetricValue + 0.02
+    val clientId = apiKey.toString
+    val client = Client(clientId, apiKey)
+    val updated = client.runUntil(response => exemptRequestMetricValue > exemptTarget)
+
+    assertTrue(s"Exempt-request-time metric not updated: $client", updated)
+    assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId) <= 0.0)
+  }
+
+  private def checkUnauthorizedRequestThrottle(apiKey: ApiKeys) {
+    val clientId = "unauthorized-" + apiKey.toString
+    val client = Client(clientId, apiKey)
+    val throttled = client.runUntil(response => throttleTimeMetricValue(clientId) > 0.0)
+    assertTrue(s"Unauthorized client should have been throttled: $client", throttled)
+  }
+}
+
+object RequestQuotaTest {
+  val ClusterActions = ApiKeys.values.toSet.filter(apiKey => apiKey.clusterAction)
+  val ClientActions = ApiKeys.values.toSet -- ClusterActions - ApiKeys.SASL_HANDSHAKE
+
+  val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
+  // Principal used for all client connections. This is modified by tests which
+  // check unauthorized code path
+  var principal = KafkaPrincipal.ANONYMOUS
+  class TestAuthorizer extends SimpleAclAuthorizer {
+    override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
+      session.principal != UnauthorizedPrincipal
+    }
+  }
+  class TestPrincipalBuilder extends DefaultPrincipalBuilder {
+    override def buildPrincipal(transportLayer: TransportLayer,  authenticator: Authenticator) = {
+      principal
+    }
+  }
+}


[3/4] kafka git commit: KAFKA-4954; Request handler utilization quotas

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 3ff6aca..99e4e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class EndTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -34,15 +35,22 @@ public class EndTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public EndTxnResponse(Errors error) {
+    public EndTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public EndTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -50,6 +58,7 @@ public class EndTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 78715be..4c6998b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -211,7 +211,7 @@ public class FetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
@@ -220,7 +220,7 @@ public class FetchRequest extends AbstractRequest {
                 null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
-        return new FetchResponse(responseData, 0);
+        return new FetchResponse(responseData, throttleTimeMs);
     }
 
     public int replicaId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 46f3426..b2eaf63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -84,12 +84,13 @@ public class FindCoordinatorRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
-            case 1:
                 return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
+            case 1:
+                return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index f96f123..b558b62 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 
 public class FindCoordinatorResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
     private static final String COORDINATOR_KEY_NAME = "coordinator";
@@ -42,17 +43,24 @@ public class FindCoordinatorResponse extends AbstractResponse {
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
 
+    private final int throttleTimeMs;
     private final String errorMessage;
     private final Errors error;
     private final Node node;
 
     public FindCoordinatorResponse(Errors error, Node node) {
+        this(DEFAULT_THROTTLE_TIME, error, node);
+    }
+
+    public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.node = node;
         this.errorMessage = null;
     }
 
     public FindCoordinatorResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
             errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
@@ -66,6 +74,10 @@ public class FindCoordinatorResponse extends AbstractResponse {
         node = new Node(nodeId, host, port);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -77,6 +89,8 @@ public class FindCoordinatorResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
             struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 44591a0..7e08a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -75,11 +75,13 @@ public class HeartbeatRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new HeartbeatResponse(Errors.forException(e));
+            case 1:
+                return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 9bc400c..a90212b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -37,15 +38,26 @@ public class HeartbeatResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
     private final Errors error;
+    private final int throttleTimeMs;
 
     public HeartbeatResponse(Errors error) {
+        this(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public HeartbeatResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public HeartbeatResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -53,6 +65,8 @@ public class HeartbeatResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index eff05e2..57d32e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -77,8 +77,8 @@ public class InitPidRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
-        return new InitPidResponse(Errors.forException(e));
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new InitPidResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static InitPidRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
index 4b65aea..3c858af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -29,27 +29,35 @@ public class InitPidResponse extends AbstractResponse {
      * OK
      *
      */
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final int throttleTimeMs;
     private final Errors error;
     private final long producerId;
     private final short epoch;
 
-    public InitPidResponse(Errors error, long producerId, short epoch) {
+    public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.producerId = producerId;
         this.epoch = epoch;
     }
 
     public InitPidResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.epoch = struct.getShort(EPOCH_KEY_NAME);
     }
 
-    public InitPidResponse(Errors errors) {
-        this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    public InitPidResponse(int throttleTimeMs, Errors errors) {
+        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
     }
 
     public long producerId() {
@@ -67,6 +75,7 @@ public class InitPidResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(EPOCH_KEY_NAME, epoch);
         struct.set(ERROR_CODE_KEY_NAME, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 994d9a2..1080fe7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -149,7 +149,7 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
@@ -161,6 +161,15 @@ public class JoinGroupRequest extends AbstractRequest {
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
                         Collections.<String, ByteBuffer>emptyMap());
+            case 2:
+                return new JoinGroupResponse(
+                        throttleTimeMs,
+                        Errors.forException(e),
+                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                        JoinGroupResponse.UNKNOWN_PROTOCOL,
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+                        Collections.<String, ByteBuffer>emptyMap());
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 1f702c7..a1c9e2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 public class JoinGroupResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -54,6 +55,7 @@ public class JoinGroupResponse extends AbstractResponse {
     public static final int UNKNOWN_GENERATION_ID = -1;
     public static final String UNKNOWN_MEMBER_ID = "";
 
+    private final int throttleTimeMs;
     private final Errors error;
     private final int generationId;
     private final String groupProtocol;
@@ -67,6 +69,17 @@ public class JoinGroupResponse extends AbstractResponse {
                              String memberId,
                              String leaderId,
                              Map<String, ByteBuffer> groupMembers) {
+        this(DEFAULT_THROTTLE_TIME, error, generationId, groupProtocol, memberId, leaderId, groupMembers);
+    }
+
+    public JoinGroupResponse(int throttleTimeMs,
+            Errors error,
+            int generationId,
+            String groupProtocol,
+            String memberId,
+            String leaderId,
+            Map<String, ByteBuffer> groupMembers) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.generationId = generationId;
         this.groupProtocol = groupProtocol;
@@ -76,6 +89,7 @@ public class JoinGroupResponse extends AbstractResponse {
     }
 
     public JoinGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         members = new HashMap<>();
 
         for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
@@ -91,6 +105,10 @@ public class JoinGroupResponse extends AbstractResponse {
         leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -126,6 +144,8 @@ public class JoinGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(GENERATION_ID_KEY_NAME, generationId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 8843755..36426c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -179,7 +179,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
             responses.put(partition, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 4b5820b..76e076e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -67,11 +67,13 @@ public class LeaveGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new LeaveGroupResponse(Errors.forException(e));
+            case 1:
+                return new LeaveGroupResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LEAVE_GROUP.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 49b704b..ccfc8a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class LeaveGroupResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -36,15 +37,26 @@ public class LeaveGroupResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
     private final Errors error;
+    private final int throttleTimeMs;
 
     public LeaveGroupResponse(Errors error) {
+        this(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public LeaveGroupResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public LeaveGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -52,6 +64,8 @@ public class LeaveGroupResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index cceff92..3d4f2b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -49,11 +49,13 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+            case 1:
+                return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index d0409ef..13f589f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 public class ListGroupsResponse extends AbstractResponse {
 
+    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String GROUPS_KEY_NAME = "groups";
     public static final String GROUP_ID_KEY_NAME = "group_id";
@@ -40,14 +41,21 @@ public class ListGroupsResponse extends AbstractResponse {
      */
 
     private final Errors error;
+    private final int throttleTimeMs;
     private final List<Group> groups;
 
     public ListGroupsResponse(Errors error, List<Group> groups) {
+        this(DEFAULT_THROTTLE_TIME, error, groups);
+    }
+
+    public ListGroupsResponse(int throttleTimeMs, Errors error, List<Group> groups) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.groups = groups;
     }
 
     public ListGroupsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
@@ -58,6 +66,10 @@ public class ListGroupsResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public List<Group> groups() {
         return groups;
     }
@@ -88,6 +100,8 @@ public class ListGroupsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> groupList = new ArrayList<>();
         for (Group group : groups) {
@@ -101,7 +115,11 @@ public class ListGroupsResponse extends AbstractResponse {
     }
 
     public static ListGroupsResponse fromError(Errors error) {
-        return new ListGroupsResponse(error, Collections.<Group>emptyList());
+        return fromError(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public static ListGroupsResponse fromError(int throttleTimeMs, Errors error) {
+        return new ListGroupsResponse(throttleTimeMs, error, Collections.<Group>emptyList());
     }
 
     public static ListGroupsResponse parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 3327071..7dbffd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -169,7 +169,7 @@ public class ListOffsetRequest extends AbstractRequest {
         super(version);
         this.replicaId = replicaId;
         this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null;
-        this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) targetTimes : null;
+        this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null;
         this.duplicatePartitions = Collections.emptySet();
     }
 
@@ -202,7 +202,7 @@ public class ListOffsetRequest extends AbstractRequest {
 
     @Override
     @SuppressWarnings("deprecation")
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
 
         short versionId = version();
@@ -224,6 +224,8 @@ public class ListOffsetRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new ListOffsetResponse(responseData);
+            case 2:
+                return new ListOffsetResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 3f049b4..61c2a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -33,6 +33,7 @@ public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
@@ -105,16 +106,23 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    private final int throttleTimeMs;
     private final Map<TopicPartition, PartitionData> responseData;
 
     /**
-     * Constructor for all versions.
+     * Constructor for all versions without throttle time
      */
     public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+        this(DEFAULT_THROTTLE_TIME, responseData);
+    }
+
+    public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, PartitionData> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
     }
 
     public ListOffsetResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -140,6 +148,10 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
@@ -151,6 +163,8 @@ public class ListOffsetResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 97072d5..3c20139 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -105,7 +105,7 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
         Errors error = Errors.forException(e);
         List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
@@ -121,6 +121,8 @@ public class MetadataRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
+            case 3:
+                return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 51aaa23..bd79653 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -35,6 +35,7 @@ import java.util.Set;
 
 public class MetadataResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
@@ -80,6 +81,7 @@ public class MetadataResponse extends AbstractResponse {
     private static final String REPLICAS_KEY_NAME = "replicas";
     private static final String ISR_KEY_NAME = "isr";
 
+    private final int throttleTimeMs;
     private final Collection<Node> brokers;
     private final Node controller;
     private final List<TopicMetadata> topicMetadata;
@@ -89,6 +91,11 @@ public class MetadataResponse extends AbstractResponse {
      * Constructor for all versions.
      */
     public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+        this(DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
+    }
+
+    public MetadataResponse(int throttleTimeMs, List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+        this.throttleTimeMs = throttleTimeMs;
         this.brokers = brokers;
         this.controller = getControllerNode(controllerId, brokers);
         this.topicMetadata = topicMetadata;
@@ -96,6 +103,7 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     public MetadataResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Map<Integer, Node> brokers = new HashMap<>();
         Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (Object brokerStruct : brokerStructs) {
@@ -179,6 +187,10 @@ public class MetadataResponse extends AbstractResponse {
         return null;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     /**
      * Get a map of the topics which had metadata errors
      * @return the map
@@ -365,6 +377,8 @@ public class MetadataResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.METADATA.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> brokerArray = new ArrayList<>();
         for (Node node : brokers) {
             Struct broker = struct.instance(BROKERS_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 45975d0..4402c4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -133,6 +133,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                             DEFAULT_RETENTION_TIME, offsetData, version);
                 case 1:
                 case 2:
+                case 3:
                     long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
                     return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
                 default:
@@ -245,7 +246,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
             responseData.put(entry.getKey(), Errors.forException(e));
@@ -257,6 +258,8 @@ public class OffsetCommitRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new OffsetCommitResponse(responseData);
+            case 3:
+                return new OffsetCommitResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b1dae37..d8d647d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -30,6 +30,7 @@ import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
@@ -57,12 +58,19 @@ public class OffsetCommitResponse extends AbstractResponse {
      */
 
     private final Map<TopicPartition, Errors> responseData;
+    private final int throttleTimeMs;
 
     public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
+        this(DEFAULT_THROTTLE_TIME, responseData);
+    }
+
+    public OffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
     }
 
     public OffsetCommitResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -79,6 +87,8 @@ public class OffsetCommitResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
@@ -100,6 +110,10 @@ public class OffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, Errors> responseData() {
         return responseData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 71ec3f6..1d810e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -116,6 +116,10 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     public OffsetFetchResponse getErrorResponse(Errors error) {
+        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors error) {
         short versionId = version();
 
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responsePartitions = new HashMap<>();
@@ -133,6 +137,8 @@ public class OffsetFetchRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new OffsetFetchResponse(error, responsePartitions);
+            case 3:
+                return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_FETCH.latestVersion()));
@@ -140,8 +146,8 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     @Override
-    public OffsetFetchResponse getErrorResponse(Throwable e) {
-        return getErrorResponse(Errors.forException(e));
+    public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return getErrorResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public String groupId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 69507f0..f795a5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 public class OffsetFetchResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
@@ -67,6 +68,7 @@ public class OffsetFetchResponse extends AbstractResponse {
 
     private final Map<TopicPartition, PartitionData> responseData;
     private final Errors error;
+    private final int throttleTimeMs;
 
     public static final class PartitionData {
         public final long offset;
@@ -85,16 +87,28 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor for all versions.
+     * Constructor for all versions without throttle time.
      * @param error Potential coordinator or group level error code (for api version 2 and later)
      * @param responseData Fetched offset information grouped by topic-partition
      */
     public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+        this(DEFAULT_THROTTLE_TIME, error, responseData);
+    }
+
+    /**
+     * Constructor with throttle time
+     * @param throttleTimeMs The time in milliseconds that this response was throttled
+     * @param error Potential coordinator or group level error code (for api version 2 and later)
+     * @param responseData Fetched offset information grouped by topic-partition
+     */
+    public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, PartitionData> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
         this.error = error;
     }
 
     public OffsetFetchResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Errors topLevelError = Errors.NONE;
         this.responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
@@ -128,6 +142,10 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
@@ -147,6 +165,8 @@ public class OffsetFetchResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 3c285f1..f898a75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -127,7 +127,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
         for (TopicPartition tp : epochsByPartition.keySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 482811e..b63f6c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -224,7 +224,7 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
@@ -241,7 +241,7 @@ public class ProduceRequest extends AbstractRequest {
             case 1:
             case 2:
             case 3:
-                return new ProduceResponse(responseMap);
+                return new ProduceResponse(responseMap, throttleTimeMs);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 152391e..06c1f6e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -46,7 +46,6 @@ public class ProduceResponse extends AbstractResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
-    public static final int DEFAULT_THROTTLE_TIME = 0;
 
     /**
      * Possible error code:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 56c29c5..e49b727 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -40,6 +40,29 @@ public class SaslHandshakeRequest extends AbstractRequest {
 
     private final String mechanism;
 
+    public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> {
+        private final String mechanism;
+
+        public Builder(String mechanism) {
+            super(ApiKeys.SASL_HANDSHAKE);
+            this.mechanism = mechanism;
+        }
+
+        @Override
+        public SaslHandshakeRequest build(short version) {
+            return new SaslHandshakeRequest(mechanism);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=SaslHandshakeRequest").
+                append(", mechanism=").append(mechanism).
+                append(")");
+            return bld.toString();
+        }
+    }
+
     public SaslHandshakeRequest(String mechanism) {
         super(ApiKeys.SASL_HANDSHAKE.latestVersion());
         this.mechanism = mechanism;
@@ -55,7 +78,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 325ee06..48aa16e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -103,7 +103,7 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responses = new HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
             responses.put(partition, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index dbc19ac..82df84a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -98,13 +98,18 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(
                         Errors.forException(e),
                         ByteBuffer.wrap(new byte[]{}));
+            case 1:
+                return new SyncGroupResponse(
+                        throttleTimeMs,
+                        Errors.forException(e),
+                        ByteBuffer.wrap(new byte[]{}));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 4a06491..b99a99f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class SyncGroupResponse extends AbstractResponse {
 
+    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
 
@@ -39,18 +40,29 @@ public class SyncGroupResponse extends AbstractResponse {
      */
 
     private final Errors error;
+    private final int throttleTimeMs;
     private final ByteBuffer memberState;
 
     public SyncGroupResponse(Errors error, ByteBuffer memberState) {
+        this(DEFAULT_THROTTLE_TIME, error, memberState);
+    }
+
+    public SyncGroupResponse(int throttleTimeMs, Errors error, ByteBuffer memberState) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.memberState = memberState;
     }
 
     public SyncGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -62,6 +74,8 @@ public class SyncGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 584f733..cca8875 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -155,12 +155,12 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
+    public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
         for (TopicPartition partition : offsets.keySet())
             errors.put(partition, error);
-        return new TxnOffsetCommitResponse(errors);
+        return new TxnOffsetCommitResponse(throttleTimeMs, errors);
     }
 
     public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 5574aea..37b9a50 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TxnOffsetCommitResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
     private static final String TOPIC_KEY_NAME = "topic";
@@ -43,12 +44,15 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     //   InvalidCommitOffsetSize
 
     private final Map<TopicPartition, Errors> errors;
+    private final int throttleTimeMs;
 
-    public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
+    public TxnOffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
     public TxnOffsetCommitResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         Map<TopicPartition, Errors> errors = new HashMap<>();
         Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
         for (Object topicPartitionObj : topicPartitionsArray) {
@@ -67,6 +71,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
         Object[] partitionsArray = new Object[mappedPartitions.size()];
         int i = 0;
@@ -91,6 +96,10 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, Errors> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index fc7a33f..be2eaab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -284,7 +284,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         if (versionId <= 3)
             return new UpdateMetadataResponse(Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 998d504..7cded24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -192,7 +192,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     }
 
     @Override
-    public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
+    public WriteTxnMarkersResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
 
         Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1ee9d0f..55b4fc6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -158,7 +158,8 @@ public class NetworkClientTest {
     }
 
     private void maybeSetExpectedApiVersionsResponse() {
-        ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize((short) 0, new ResponseHeader(0));
+        short apiVersionsResponseVersion = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
+        ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 934c895..2017ef9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -384,7 +384,7 @@ public class SenderTest {
             public boolean matches(AbstractRequest body) {
                 return body instanceof InitPidRequest;
             }
-        }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+        }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
         sender.run(time.milliseconds());
         assertTrue(transactionManager.hasPid());
         assertEquals(producerId, transactionManager.pidAndEpoch().producerId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a1efa58..1ab86aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -199,7 +199,7 @@ public class TransactionManagerTest {
                 assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch());
                 return true;
             }
-        }, new AddOffsetsToTxnResponse(Errors.NONE));
+        }, new AddOffsetsToTxnResponse(0, Errors.NONE));
 
         sender.run(time.milliseconds());  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We should now have created and queued the offset commit request.
@@ -219,7 +219,7 @@ public class TransactionManagerTest {
                 assertEquals(epoch, txnOffsetCommitRequest.producerEpoch());
                 return true;
             }
-        }, new TxnOffsetCommitResponse(txnOffsetCommitResponse));
+        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
@@ -528,7 +528,7 @@ public class TransactionManagerTest {
                 assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
                 return true;
             }
-        }, new InitPidResponse(error, pid, epoch), shouldDisconnect);
+        }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
     private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
@@ -563,7 +563,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(error));
+        }, new AddPartitionsToTxnResponse(0, error));
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
@@ -577,7 +577,7 @@ public class TransactionManagerTest {
                 assertEquals(result, endTxnRequest.command());
                 return true;
             }
-        }, new EndTxnResponse(error));
+        }, new EndTxnResponse(0, error));
     }
 
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1de6789..345ace1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.network;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
@@ -27,6 +29,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -40,6 +43,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -460,6 +464,41 @@ public class SslTransportLayerTest {
         NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
 
+    /**
+     * Tests that time spent on the network thread is accumulated on each channel
+     */
+    @Test
+    public void testNetworkThreadTimeRecorded() throws Exception {
+        selector.close();
+        this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
+                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+
+        String node = "0";
+        server = createEchoServer(SecurityProtocol.SSL);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        String message = TestUtils.randomString(10 * 1024);
+        NetworkTestUtils.waitForChannelReady(selector, node);
+        KafkaChannel channel = selector.channel(node);
+        assertTrue("SSL handshake time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+        assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+        selector.mute(node);
+        selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
+        while (selector.completedSends().isEmpty()) {
+            selector.poll(100L);
+        }
+        assertTrue("Send time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+        assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+        selector.unmute(node);
+        while (selector.completedReceives().isEmpty()) {
+            selector.poll(100L);
+        }
+        assertTrue("Receive time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+    }
+
     @Test
     public void testCloseSsl() throws Exception {
         testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index a4aeb25..8410e6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -16,6 +16,14 @@
  */
 package org.apache.kafka.common.protocol;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.junit.Test;
 
 public class ApiKeysTest {
@@ -35,4 +43,27 @@ public class ApiKeysTest {
         ApiKeys.PRODUCE.requestSchema((short) Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
     }
 
+    /**
+     * All valid client responses which may be throttled should have a field named
+     * 'throttle_time_ms' to return the throttle time to the client. Exclusions are
+     * <ul>
+     *   <li>Cluster actions used only for inter-broker are throttled only if unauthorized
+     *   <li> SASL_HANDSHAKE is not throttled when used for authentication when a connection
+     *        is established. At any other time, this request returns an error response that
+     *        may be throttled.
+     * </ul>
+     */
+    @Test
+    public void testResponseThrottleTime() {
+        List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE);
+
+        for (ApiKeys apiKey: ApiKeys.values()) {
+            Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
+            Field throttleTimeField = responseSchema.get("throttle_time_ms");
+            if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+                assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
+            else
+                assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e283c0..422f9e6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -841,7 +841,7 @@ public class RequestResponseTest {
     }
 
     private InitPidResponse createInitPidResponse() {
-        return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+        return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
     }
 
 
@@ -871,7 +871,7 @@ public class RequestResponseTest {
     }
 
     private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
-        return new AddPartitionsToTxnResponse(Errors.NONE);
+        return new AddPartitionsToTxnResponse(0, Errors.NONE);
     }
 
     private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
@@ -879,7 +879,7 @@ public class RequestResponseTest {
     }
 
     private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
-        return new AddOffsetsToTxnResponse(Errors.NONE);
+        return new AddOffsetsToTxnResponse(0, Errors.NONE);
     }
 
     private EndTxnRequest createEndTxnRequest() {
@@ -887,7 +887,7 @@ public class RequestResponseTest {
     }
 
     private EndTxnResponse createEndTxnResponse() {
-        return new EndTxnResponse(Errors.NONE);
+        return new EndTxnResponse(0, Errors.NONE);
     }
 
     private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
@@ -914,7 +914,7 @@ public class RequestResponseTest {
     private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
         final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
         errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
-        return new TxnOffsetCommitResponse(errorPerPartitions);
+        return new TxnOffsetCommitResponse(0, errorPerPartitions);
     }
 
     private static class ByteBufferChannel implements GatheringByteChannel {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7bfe76a..2d41869 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,7 +41,7 @@ import scala.reflect.{ClassTag, classTag}
 
 object RequestChannel extends Logging {
   val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-    buffer = shutdownReceive, startTimeMs = 0, listenerName = new ListenerName(""),
+    buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
     securityProtocol = SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -57,14 +57,15 @@ object RequestChannel extends Logging {
   }
 
   case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
-                     startTimeMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
+                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers are in the request
     // handler threads or the purgatory threads
-    @volatile var requestDequeueTimeMs = -1L
-    @volatile var apiLocalCompleteTimeMs = -1L
-    @volatile var responseCompleteTimeMs = -1L
-    @volatile var responseDequeueTimeMs = -1L
-    @volatile var apiRemoteCompleteTimeMs = -1L
+    @volatile var requestDequeueTimeNanos = -1L
+    @volatile var apiLocalCompleteTimeNanos = -1L
+    @volatile var responseCompleteTimeNanos = -1L
+    @volatile var responseDequeueTimeNanos = -1L
+    @volatile var apiRemoteCompleteTimeNanos = -1L
+    @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val requestId = buffer.getShort()
 
@@ -122,26 +123,33 @@ object RequestChannel extends Logging {
 
     trace("Processor %d received request : %s".format(processor, requestDesc(true)))
 
-    def updateRequestMetrics() {
-      val endTimeMs = Time.SYSTEM.milliseconds
-      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
+    def requestThreadTimeNanos = {
+      if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
+      math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
+    }
+
+    def updateRequestMetrics(networkThreadTimeNanos: Long) {
+      val endTimeNanos = Time.SYSTEM.nanoseconds
+      // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
       // processing time is really small. This value is set in KafkaApis from a request handling thread.
       // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
-      // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
-      if (apiLocalCompleteTimeMs < 0)
-        apiLocalCompleteTimeMs = responseCompleteTimeMs
-      // If the apiRemoteCompleteTimeMs is not set (i.e., for requests that do not go through a purgatory), then it is
-      // the same as responseCompleteTimeMs.
-      if (apiRemoteCompleteTimeMs < 0)
-        apiRemoteCompleteTimeMs = responseCompleteTimeMs
-
-      val requestQueueTime = math.max(requestDequeueTimeMs - startTimeMs, 0)
-      val apiLocalTime = math.max(apiLocalCompleteTimeMs - requestDequeueTimeMs, 0)
-      val apiRemoteTime = math.max(apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs, 0)
-      val apiThrottleTime = math.max(responseCompleteTimeMs - apiRemoteCompleteTimeMs, 0)
-      val responseQueueTime = math.max(responseDequeueTimeMs - responseCompleteTimeMs, 0)
-      val responseSendTime = math.max(endTimeMs - responseDequeueTimeMs, 0)
-      val totalTime = endTimeMs - startTimeMs
+      // see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos.
+      if (apiLocalCompleteTimeNanos < 0)
+        apiLocalCompleteTimeNanos = responseCompleteTimeNanos
+      // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is
+      // the same as responseCompleteTimeNans.
+      if (apiRemoteCompleteTimeNanos < 0)
+        apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
+
+      def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
+
+      val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+      val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+      val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+      val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
+      val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+      val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+      val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (requestId == ApiKeys.FETCH.id) {
           val isFromFollower = body[FetchRequest].isFromFollower
@@ -164,16 +172,32 @@ object RequestChannel extends Logging {
         m.totalTimeHist.update(totalTime)
       }
 
+      // Records network handler thread usage. This is included towards the request quota for the
+      // user/client. Throttling is only performed when request handler thread usage
+      // is recorded, just before responses are queued for delivery.
+      // The time recorded here is the time spent on the network thread for receiving this request
+      // and sending the response. Note that for the first request on a connection, the time includes
+      // the total time spent on authentication, which may be significant for SASL/SSL.
+      recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
+
       if (requestLogger.isDebugEnabled) {
         val detailsEnabled = requestLogger.isTraceEnabled
-        requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
-          .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value))
+        def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
+        val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
+        val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+        val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+        val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+        val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+        val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+        requestLogger.trace("Completed request:%s from connection %s;totalTime:%f,requestQueueTime:%f,localTime:%f,remoteTime:%f,responseQueueTime:%f,sendTime:%f,securityProtocol:%s,principal:%s,listener:%s"
+          .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
       }
     }
   }
 
   case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
-    request.responseCompleteTimeMs = Time.SYSTEM.milliseconds
+    request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
+    if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
     def this(request: Request, responseSend: Send) =
       this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
@@ -253,7 +277,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   def receiveResponse(processor: Int): RequestChannel.Response = {
     val response = responseQueues(processor).poll()
     if (response != null)
-      response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
+      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
     response
   }
 


[2/4] kafka git commit: KAFKA-4954; Request handler utilization quotas

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b9bf3e4..b2a3456 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -419,6 +419,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
+    true,
     ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
 
   override def run() {
@@ -457,7 +458,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.NoOpAction =>
             // There is no response to send to the client, we need to read more pipelined requests
             // that are sitting in the server's socket buffer
-            curr.request.updateRequestMetrics
+            updateRequestMetrics(curr.request)
             trace("Socket server received empty response to send, registering for read: " + curr)
             val channelId = curr.request.connectionId
             if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
@@ -465,7 +466,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.SendAction =>
             sendResponse(curr)
           case RequestChannel.CloseConnectionAction =>
-            curr.request.updateRequestMetrics
+            updateRequestMetrics(curr.request)
             trace("Closing socket connection actively according to the response code.")
             close(selector, curr.request.connectionId)
         }
@@ -482,7 +483,7 @@ private[kafka] class Processor(val id: Int,
     // `channel` can be null if the selector closed the connection because it was idle for too long
     if (channel == null) {
       warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
-      response.request.updateRequestMetrics()
+      response.request.updateRequestMetrics(0L)
     }
     else {
       selector.send(response.responseSend)
@@ -505,14 +506,13 @@ private[kafka] class Processor(val id: Int,
     selector.completedReceives.asScala.foreach { receive =>
       try {
         val openChannel = selector.channel(receive.source)
-        val session = {
-          // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
-          val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
-          RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
-        }
+        // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
+        val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
+
         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
-          buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
-          securityProtocol = securityProtocol)
+          buffer = receive.payload, startTimeNanos = time.nanoseconds,
+          listenerName = listenerName, securityProtocol = securityProtocol)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {
@@ -529,17 +529,24 @@ private[kafka] class Processor(val id: Int,
       val resp = inflightResponses.remove(send.destination).getOrElse {
         throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
       }
-      resp.request.updateRequestMetrics()
+      updateRequestMetrics(resp.request)
       selector.unmute(send.destination)
     }
   }
 
+  private def updateRequestMetrics(request: RequestChannel.Request) {
+    val channel = selector.channel(request.connectionId)
+    val openOrClosingChannel = if (channel != null) channel else selector.closingChannel(request.connectionId)
+    val networkThreadTimeNanos = if (openOrClosingChannel != null) openOrClosingChannel.getAndResetNetworkThreadTimeNanos() else 0L
+    request.updateRequestMetrics(networkThreadTimeNanos)
+  }
+
   private def processDisconnected() {
     selector.disconnected.asScala.foreach { connectionId =>
       val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
         throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
       }.remoteHost
-      inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
+      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
       // the channel has been closed by the selector but the quotas still need to be updated
       connectionQuotas.dec(InetAddress.getByName(remoteHost))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 84772db..04f5239 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -31,10 +31,11 @@ import scala.collection.JavaConverters._
 
 /**
  * Represents the sensors aggregated per client
+ * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
  * @param quotaSensor @Sensor that tracks the quota
  * @param throttleTimeSensor @Sensor that tracks the throttle time
  */
-private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
 
 /**
  * Configuration settings for quota management
@@ -58,6 +59,8 @@ object ClientQuotaManagerConfig {
   val DefaultQuotaWindowSizeSeconds = 1
   // Purge sensors after 1 hour of inactivity
   val InactiveSensorExpirationTimeSeconds  = 3600
+  val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+  val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
 
   val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
   val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
@@ -126,12 +129,12 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
  *
  * @param config @ClientQuotaManagerConfig quota configs
  * @param metrics @Metrics Metrics instance
- * @param apiKey API Key for the request
+ * @param quotaType Quota type of this quota manager
  * @param time @Time object to use
  */
-final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
-                         private val apiKey: QuotaType,
+                         private val quotaType: QuotaType,
                          private val time: Time) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@@ -140,19 +143,22 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess
   val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
-  throttledRequestReaper.start()
 
-  private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+  private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
-                                      apiKey.toString,
+                                      quotaType.toString,
                                       "Tracks the size of the delay queue"), new Total())
+  start() // Use start method to keep findbugs happy
+  private def start() {
+    throttledRequestReaper.start()
+  }
 
   /**
    * Reaper thread that triggers callbacks on all throttled requests
    * @param delayQueue DelayQueue to dequeue from
    */
   class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
-    "ThrottledRequestReaper-%s".format(apiKey), false) {
+    "ThrottledRequestReaper-%s".format(quotaType), false) {
 
     override def doWork(): Unit = {
       val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -166,17 +172,23 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
-   * @param clientId clientId that produced the data
-   * @param value amount of data written in bytes
+   * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
+   * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
+   * Throttle time calculation may be overridden by sub-classes.
+   * @param sanitizedUser user principal of client
+   * @param clientId clientId that produced/fetched the data
+   * @param value amount of data in bytes or request processing time as a percentage
    * @param callback Callback function. This will be triggered immediately if quota is not violated.
    *                 If there is a quota violation, this callback will be triggered after a delay
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = {
-    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
-    val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
+  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+    val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+    recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+  }
+
+  def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
     var throttleTimeMs = 0
     try {
       clientSensors.quotaSensor.record(value)
@@ -185,8 +197,9 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     } catch {
       case _: QuotaViolationException =>
         // Compute the delay
+        val clientQuotaEntity = clientSensors.quotaEntity
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
+        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -197,6 +210,15 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
+   * Records that a user/clientId changed some metric being throttled without checking for
+   * quota violation. The aggregate value will subsequently be used for throttling when the
+   * next request is processed.
+   */
+  def recordNoThrottle(clientSensors: ClientSensors, value: Double) {
+    clientSensors.quotaSensor.record(value, time.milliseconds(), false)
+  }
+
+  /**
    * Determines the quota-id for the client with the specified user principal
    * and client-id and returns the quota entity that encapsulates the quota-id
    * and the associated quota override or default quota.
@@ -325,13 +347,13 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
+  protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
     val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
     val quota = config.quota()
     val difference = clientMetric.value() - quota.bound
     // Use the precise window used by the rate calculation
     val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
-    throttleTimeMs.round.toInt
+    throttleTimeMs.round
   }
 
   // Casting to Rate because we only use Rate in Quota computation
@@ -346,39 +368,54 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * This function either returns the sensors for a given client id or creates them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
-  private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {
+  def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
+    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
     // Names of the sensors to access
     ClientSensors(
+      clientQuotaEntity,
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(quotaEntity.quotaId),
+        getQuotaSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock, metrics,
-        () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId),
-        () => getQuotaMetricConfig(quotaEntity.quota),
-        () => new Rate()
+        () => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
+        () => getQuotaMetricConfig(clientQuotaEntity.quota),
+        () => measurableStat
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock,
         metrics,
-        () => throttleMetricName(quotaEntity),
+        () => throttleMetricName(clientQuotaEntity),
         () => null,
         () => new Avg()
       )
     )
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def measurableStat: MeasurableStat = new Rate()
+
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
-  private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+  protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
             .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
             .samples(config.numQuotaSamples)
             .quota(quota)
   }
 
+  protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
+    sensorAccessor.getOrCreate(
+        sensorName,
+        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+        lock, metrics,
+        () => metricName,
+        () => null,
+        () => measurableStat
+      )
+  }
+
   /**
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
    * for any of these levels.
@@ -409,7 +446,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       }
       quota match {
         case Some(newQuota) =>
-          logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
+          logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
           (sanitizedUser, clientId) match {
             case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -418,7 +455,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             case (None, None) =>
           }
         case None =>
-          logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}")
+          logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
           overriddenQuota.remove(quotaId)
       }
 
@@ -460,8 +497,8 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
-  private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
-    metrics.metricName("byte-rate", apiKey.toString,
+  protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+    metrics.metricName("byte-rate", quotaType.toString,
                    "Tracking byte-rate per user/client-id",
                    "user", sanitizedUser,
                    "client-id", clientId)
@@ -469,7 +506,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
 
   private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
     metrics.metricName("throttle-time",
-                       apiKey.toString,
+                       quotaType.toString,
                        "Tracking average throttle-time per user/client-id",
                        "user", quotaEntity.sanitizedUser,
                        "client-id", quotaEntity.clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
new file mode 100644
index 0000000..7e80be6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.utils.Time
+
+
+class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
+                         private val metrics: Metrics,
+                         private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+  val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+  val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
+
+  def recordExempt(value: Double) {
+    exemptSensor.record(value)
+  }
+
+  override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+    math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+  }
+
+  override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+    metrics.metricName("request-time", QuotaType.Request.toString,
+                   "Tracking request-time per user/client-id",
+                   "user", sanitizedUser,
+                   "client-id", clientId)
+  }
+
+  private def exemptMetricName: MetricName = {
+    metrics.metricName("exempt-request-time", QuotaType.Request.toString,
+                   "Tracking exempt-request-time utilization percentage")
+  }
+
+  private def exemptSensorName: String = "exempt-" + QuotaType.Request
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 8d6de8c..2483199 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -130,6 +130,12 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
       else
         None
     quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+    val requestQuota =
+      if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
+        Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true))
+      else
+        None
+    quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e68f921..175bf62 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -63,19 +63,23 @@ object DynamicConfig {
     //Properties
     val ProducerByteRateOverrideProp = "producer_byte_rate"
     val ConsumerByteRateOverrideProp = "consumer_byte_rate"
+    val RequestPercentageOverrideProp = "request_percentage"
 
     //Defaults
     val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
     val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+    val DefaultRequestOverride = ClientQuotaManagerConfig.QuotaRequestPercentDefault
 
     //Documentation
     val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
     val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer traffic."
+    val RequestOverrideDoc = "A percentage representing the upper bound of time spent for processing requests."
 
     //Definitions
     private val clientConfigs = new ConfigDef()
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
+      .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
 
     def names = clientConfigs.names
 
@@ -88,6 +92,7 @@ object DynamicConfig {
     private val userConfigs = CredentialProvider.userCredentialConfigs
       .define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
       .define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
+      .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
 
     def names = userConfigs.names
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59f062d..1e1f0d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -118,24 +118,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: FatalExitError => throw e
-      case e: Throwable =>
-        if (request.requestObj != null) {
-          request.requestObj.handleError(e, requestChannel, request)
-          error("Error when handling request %s".format(request.requestObj), e)
-        } else {
-          val response = request.body[AbstractRequest].getErrorResponse(e)
-
-          /* If request doesn't have a default error response, we just close the connection.
-             For example, when produce request has acks set to 0 */
-          if (response == null)
-            requestChannel.closeConnection(request.processor, request)
-          else
-            requestChannel.sendResponse(new Response(request, response))
-
-          error("Error when handling request %s".format(request.body[AbstractRequest]), e)
-        }
-    } finally
-      request.apiLocalCompleteTimeMs = time.milliseconds
+      case e: Throwable => handleError(request, e)
+    } finally {
+      request.apiLocalCompleteTimeNanos = time.nanoseconds
+    }
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -165,16 +151,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val leaderAndIsrResponse =
-        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
-          new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
-        } else {
-          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
-        }
-
-      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
+      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+        val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+        val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
+        sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+      } else {
+        val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+        def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+        sendResponseMaybeThrottle(request, createResponse)
+      }
     } catch {
       case e: FatalExitError => throw e
       case e: KafkaStorageException =>
@@ -189,27 +174,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
 
-    val response =
-      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
-        // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
-        // since this broker is no longer a replica for that offsets topic partition.
-        // This is required to handle the following scenario :
-        // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
-        // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
-        // is not cleared.
-        result.foreach { case (topicPartition, error) =>
-          if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
-            groupCoordinator.handleGroupEmigration(topicPartition.partition)
-          }
+    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+      val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+      // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+      // since this broker is no longer a replica for that offsets topic partition.
+      // This is required to handle the following scenario :
+      // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+      // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+      // is not cleared.
+      result.foreach { case (topicPartition, error) =>
+        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
+          groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
-        new StopReplicaResponse(error, result.asJava)
-      } else {
-        val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
       }
+      val response = new StopReplicaResponse(error, result.asJava)
+      sendResponseExemptThrottle(request, new Response(request, response))
+    } else {
+      val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
+    }
 
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -217,23 +202,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    val updateMetadataResponse =
-      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
-        if (deletedPartitions.nonEmpty)
-          groupCoordinator.handleDeletedPartitions(deletedPartitions)
+    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+      val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
+      if (deletedPartitions.nonEmpty)
+        groupCoordinator.handleDeletedPartitions(deletedPartitions)
 
-        if (adminManager.hasDelayedTopicOperations) {
-          updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
-            adminManager.tryCompleteDelayedTopicOperations(topic)
-          }
+      if (adminManager.hasDelayedTopicOperations) {
+        updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
+        adminManager.tryCompleteDelayedTopicOperations(topic)
         }
-        new UpdateMetadataResponse(Errors.NONE)
-      } else {
-        new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
       }
-
-    requestChannel.sendResponse(new Response(request, updateMetadataResponse))
+      sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+    } else {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
+    }
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -249,9 +232,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case Success(partitionsRemaining) =>
           val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
             Errors.NONE, partitionsRemaining)
-          requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+          sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
         case Failure(throwable) =>
-          controlledShutdownRequest.handleError(throwable, requestChannel, request)
+          sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
       }
     }
     controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@@ -270,8 +253,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
       }.toMap
-      val response = new OffsetCommitResponse(results.asJava)
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
         case (topicPartition, _) =>
@@ -300,8 +283,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
-        val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
-        requestChannel.sendResponse(new RequestChannel.Response(request, response))
+        def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
+        sendResponseMaybeThrottle(request, createResponse)
       }
 
       if (authorizedTopics.isEmpty)
@@ -407,7 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      def produceResponseCallback(delayTimeMs: Int) {
+      def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
         if (produceRequest.acks == 0) {
           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
           // the request, since no response is expected by the producer, the server will close socket server so that
@@ -426,13 +409,15 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.noOperation(request.processor, request)
           }
         } else {
-          val respBody = new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
-          requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+          def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
+            new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+          }
+          sendResponseMaybeThrottle(request, createResponseCallback)
         }
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       quotas.produce.recordAndMaybeThrottle(
         request.session.sanitizedUser,
@@ -534,20 +519,29 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = new FetchResponse(fetchedPartitionData, 0)
       val responseStruct = response.toStruct(versionId)
 
-      def fetchResponseCallback(throttleTimeMs: Int) {
-        trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
-        val responseSend = response.toSend(responseStruct, throttleTimeMs, request.connectionId, request.header)
-        requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+      def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
+        def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
+          trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
+          new RequestChannel.Response(request, responseSend)
+        }
+        def sendResponseCallback(requestThrottleTimeMs: Int) {
+          requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+        }
+        if (fetchRequest.isFromFollower)
+          sendResponseExemptThrottle(request, createResponse(0))
+        else
+          sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
         val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
         quotas.leader.record(responseSize)
-        fetchResponseCallback(throttleTimeMs = 0)
+        fetchResponseCallback(bandwidthThrottleTimeMs = 0)
       } else {
         quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
@@ -597,8 +591,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       else
         handleListOffsetRequestV1(request)
 
-    val response = new ListOffsetResponse(mergedResponseMap.asJava)
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -925,13 +919,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, request.header.clientId))
 
-    val responseBody = new MetadataResponse(
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
+      throttleTimeMs,
       brokers.map(_.getNode(request.listenerName)).asJava,
       clusterId,
       metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
       completeTopicMetadata.asJava
     )
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   /**
@@ -944,68 +940,70 @@ class KafkaApis(val requestChannel: RequestChannel,
     def authorizeTopicDescribe(partition: TopicPartition) =
       authorize(request.session, Describe, new Resource(Topic, partition.topic))
 
-    val offsetFetchResponse =
-      // reject the request if not authorized to the group
-      if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
-        offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      else {
-        if (header.apiVersion == 0) {
-          val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
-            .partition(authorizeTopicDescribe)
+    def createResponse(throttleTimeMs: Int): AbstractResponse = {
+      val offsetFetchResponse =
+        // reject the request if not authorized to the group
+        if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
+          offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+        else {
+          if (header.apiVersion == 0) {
+            val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+              .partition(authorizeTopicDescribe)
 
-          // version 0 reads offsets from ZK
-          val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
-            val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
-            try {
-              if (!metadataCache.contains(topicPartition.topic))
-                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-              else {
-                val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
-                payloadOpt match {
-                  case Some(payload) =>
-                    (topicPartition, new OffsetFetchResponse.PartitionData(
-                        payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
-                  case None =>
-                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+            // version 0 reads offsets from ZK
+            val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
+              val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+              try {
+                if (!metadataCache.contains(topicPartition.topic))
+                  (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+                else {
+                  val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+                  payloadOpt match {
+                    case Some(payload) =>
+                      (topicPartition, new OffsetFetchResponse.PartitionData(
+                          payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
+                    case None =>
+                      (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+                  }
                 }
+              } catch {
+                case e: Throwable =>
+                  (topicPartition, new OffsetFetchResponse.PartitionData(
+                      OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
               }
-            } catch {
-              case e: Throwable =>
-                (topicPartition, new OffsetFetchResponse.PartitionData(
-                    OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
-            }
-          }.toMap
+            }.toMap
 
-          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-          new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
-        } else {
-          // versions 1 and above read offsets from Kafka
-          if (offsetFetchRequest.isAllPartitions) {
-            val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
-            if (error != Errors.NONE)
-              offsetFetchRequest.getErrorResponse(error)
-            else {
-              // clients are not allowed to see offsets for topics that are not authorized for Describe
-              val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
-              new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava)
-            }
+            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+            new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
           } else {
-            val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
-              .partition(authorizeTopicDescribe)
-            val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
-              Some(authorizedPartitions))
-            if (error != Errors.NONE)
-              offsetFetchRequest.getErrorResponse(error)
-            else {
-              val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-              new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+            // versions 1 and above read offsets from Kafka
+            if (offsetFetchRequest.isAllPartitions) {
+              val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+              if (error != Errors.NONE)
+                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+              else {
+                // clients are not allowed to see offsets for topics that are not authorized for Describe
+                val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
+                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+              }
+            } else {
+              val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+                .partition(authorizeTopicDescribe)
+              val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+                Some(authorizedPartitions))
+              if (error != Errors.NONE)
+                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+              else {
+                val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+              }
             }
           }
         }
+        trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+        offsetFetchResponse
       }
-
-    trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-    requestChannel.sendResponse(new Response(request, offsetFetchResponse))
+      sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request) {
@@ -1014,8 +1012,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
       !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
 
-      val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // TODO: Authorize by transactional id if coordinator type is TRANSACTION
 
@@ -1035,24 +1033,26 @@ class KafkaApis(val requestChannel: RequestChannel,
           throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
       }
 
-      val responseBody = if (topicMetadata.error != Errors.NONE) {
-        new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-      } else {
-        val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
-          .find(_.partition == partition)
-          .map(_.leader())
-
-        coordinatorEndpoint match {
-          case Some(endpoint) if !endpoint.isEmpty =>
-            new FindCoordinatorResponse(Errors.NONE, endpoint)
-          case _ =>
-            new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = if (topicMetadata.error != Errors.NONE) {
+          new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        } else {
+          val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
+            .find(_.partition == partition)
+            .map(_.leader())
+
+          coordinatorEndpoint match {
+            case Some(endpoint) if !endpoint.isEmpty =>
+              new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+            case _ =>
+              new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+          }
         }
+        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+          .format(responseBody, request.header.correlationId, request.header.clientId))
+        responseBody
       }
-
-      trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-        .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      sendResponseMaybeThrottle(request, createResponse)
     }
   }
 
@@ -1074,19 +1074,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
     }.toMap
 
-    val responseBody = new DescribeGroupsResponse(groups.asJava)
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
-      ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+    if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
-      new ListGroupsResponse(error, allGroups.asJava)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     }
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -1095,23 +1096,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
-      val responseBody = new JoinGroupResponse(joinResult.error, joinResult.generationId,
-        joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+          joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
 
-      trace("Sending join group response %s for correlation id %d to client %s."
-        .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+        trace("Sending join group response %s for correlation id %d to client %s."
+          .format(responseBody, request.header.correlationId, request.header.clientId))
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
-      val responseBody = new JoinGroupResponse(
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
+        throttleTimeMs,
         Errors.GROUP_AUTHORIZATION_FAILED,
         JoinGroupResponse.UNKNOWN_GENERATION_ID,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
         Collections.emptyMap())
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1133,8 +1138,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val syncGroupRequest = request.body[SyncGroupRequest]
 
     def sendResponseCallback(memberState: Array[Byte], error: Errors) {
-      val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState))
-      requestChannel.sendResponse(new Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1155,15 +1160,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(error: Errors) {
-      val response = new HeartbeatResponse(error)
-      trace("Sending heartbeat response %s for correlation id %d to client %s."
-        .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = new HeartbeatResponse(throttleTimeMs, error)
+        trace("Sending heartbeat response %s for correlation id %d to client %s."
+          .format(response, request.header.correlationId, request.header.clientId))
+        response
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      requestChannel.sendResponse(new Response(request, heartbeatResponse))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     }
     else {
       // let the coordinator to handle heartbeat
@@ -1180,15 +1188,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a leave-group response
     def sendResponseCallback(error: Errors) {
-      val response = new LeaveGroupResponse(error)
-      trace("Sending leave group response %s for correlation id %d to client %s."
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = new LeaveGroupResponse(throttleTimeMs, error)
+        trace("Sending leave group response %s for correlation id %d to client %s."
                     .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, response))
+        response
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED)
-      requestChannel.sendResponse(new Response(request, leaveGroupResponse))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+      sendResponseMaybeThrottle(request, createResponse)
     } else {
       // let the coordinator to handle leave-group
       groupCoordinator.handleLeaveGroup(
@@ -1199,8 +1210,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
-    requestChannel.sendResponse(new RequestChannel.Response(request, response))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1210,20 +1221,26 @@ class KafkaApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    val responseSend =
-      if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
-        ApiVersionsResponse.API_VERSIONS_RESPONSE.toSend(request.connectionId, request.header)
-      else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+    def sendResponseCallback(throttleTimeMs: Int) {
+      val responseSend =
+        if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
+          ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+        else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+    }
+    sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
     val createTopicsRequest = request.body[CreateTopicsRequest]
 
     def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
-      val responseBody = new CreateTopicsResponse(results.asJava)
-      trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+        trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!controller.isActive) {
@@ -1279,11 +1296,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
-          unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
-      val responseBody = new DeleteTopicsResponse(completeResults.asJava)
-      trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+            unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+        val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+        trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (!controller.isActive) {
@@ -1335,11 +1355,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
-      requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
-
-      // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = time.milliseconds
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     if (authorizedForDeleteTopics.isEmpty)
@@ -1359,9 +1376,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(result: InitPidResult): Unit = {
-      val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
-      trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
+        trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
     txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
   }
@@ -1370,9 +1390,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val endTxnRequest = request.body[EndTxnRequest]
 
     def sendResponseCallback(error: Errors) {
-      val responseBody = new EndTxnResponse(error)
-      trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody = new EndTxnResponse(throttleTimeMs, error)
+        trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(),
@@ -1383,11 +1406,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+    authorizeClusterAction(request)
     val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
-    requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
+    val responseBody = new WriteTxnMarkersResponse(emptyResponse)
+    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
   }
 
-
   def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
@@ -1395,9 +1419,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(error: Errors): Unit = {
-      val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(error)
-      trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error)
+        trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1415,9 +1442,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // Send response callback
     def sendResponseCallback(error: Errors): Unit = {
-      val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(error)
-      trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+        trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
     }
 
     txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1429,7 +1459,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
     val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
-    requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
+    def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, emptyResponse)
+    sendResponseMaybeThrottle(request, createResponse)
   }
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
@@ -1440,11 +1471,93 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseBody = new OffsetsForLeaderEpochResponse(
       replicaManager.getResponseFor(requestInfo)
     )
-    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+  }
+
+  private def handleError(request: RequestChannel.Request, e: Throwable) {
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
+    if (request.requestObj != null) {
+      def sendResponseCallback(throttleTimeMs: Int) {
+        request.requestObj.handleError(e, requestChannel, request)
+        error("Error when handling request %s".format(request.requestObj), e)
+      }
+
+      if (mayThrottle) {
+        val clientId : String =
+          if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
+            request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
+          else
+            throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
+        sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
+      } else
+        sendResponseExemptThrottle(request, () => sendResponseCallback(0))
+    } else {
+      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+        val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
+
+        /* If request doesn't have a default error response, we just close the connection.
+           For example, when produce request has acks set to 0 */
+        if (response == null)
+          requestChannel.closeConnection(request.processor, request)
+        response
+      }
+      error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+      if (mayThrottle)
+        sendResponseMaybeThrottle(request, createResponse)
+      else
+        sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+    }
   }
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
+
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
+    def sendResponseCallback(throttleTimeMs: Int) {
+      val response = createResponse(throttleTimeMs)
+      if (response != null)
+        sendResponse(request, response)
+    }
+    sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+  }
+
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
+
+    if (request.apiRemoteCompleteTimeNanos == -1) {
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
+    }
+    val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser, clientId)
+    def recordNetworkThreadTimeNanos(timeNanos: Long) {
+      quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
+    }
+    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+    quotas.request.recordAndThrottleOnQuotaViolation(
+        quotaSensors,
+        nanosToPercentage(request.requestThreadTimeNanos),
+        sendResponseCallback)
+  }
+
+  private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
+    sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+  }
+
+  private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
+    def recordNetworkThreadTimeNanos(timeNanos: Long) {
+      quotas.request.recordExempt(nanosToPercentage(timeNanos))
+    }
+    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+    quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
+    sendResponseCallback()
+  }
+
+  private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
+    requestChannel.sendResponse(new Response(request, response))
+  }
+
+  private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index c9c31ad..a1600cb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -50,7 +50,10 @@ class KafkaRequestHandler(id: Int,
           // time should be discounted by # threads.
           val startSelectTime = time.nanoseconds
           req = requestChannel.receiveRequest(300)
-          val idleTime = time.nanoseconds - startSelectTime
+          val endTime = time.nanoseconds
+          if (req != null)
+            req.requestDequeueTimeNanos = endTime
+          val idleTime = endTime - startSelectTime
           aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
         }
 
@@ -59,7 +62,6 @@ class KafkaRequestHandler(id: Int,
           latch.countDown()
           return
         }
-        req.requestDequeueTimeMs = time.milliseconds
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 671ad63..dee39a3 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time
 object QuotaType  {
   case object Fetch extends QuotaType
   case object Produce extends QuotaType
+  case object Request extends QuotaType
   case object LeaderReplication extends QuotaType
   case object FollowerReplication extends QuotaType
 }
@@ -36,10 +37,11 @@ object QuotaFactory {
     override def isQuotaExceeded(): Boolean = false
   }
 
-  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
     def shutdown() {
       fetch.shutdown
       produce.shutdown
+      request.shutdown
     }
   }
 
@@ -47,6 +49,7 @@ object QuotaFactory {
     QuotaManagers(
       new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
       new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
     )
@@ -66,6 +69,13 @@ object QuotaFactory {
       quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
     )
 
+  def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
+    ClientQuotaManagerConfig(
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+  }
+
   def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
     ReplicationQuotaManagerConfig(
       numQuotaSamples = cfg.numReplicationQuotaSamples,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index d9822cd..ff9fef0 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -206,7 +206,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
       val hostStr = s"${node.host}:${node.port}"
       assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
       val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index aa1717a..f21c1df 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -26,13 +26,15 @@ import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import kafka.server.QuotaType
+import org.apache.kafka.common.metrics.KafkaMetric
 
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   def userPrincipal : String
   def producerQuotaId : QuotaId
   def consumerQuotaId : QuotaId
-  def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
   def removeQuotaOverrides()
 
   override val serverCount = 2
@@ -55,10 +57,13 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "0")
+  this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
 
   // Low enough quota that a producer sending a small payload in a tight loop should get throttled
   val defaultProducerQuota = 8000
   val defaultConsumerQuota = 2500
+  val defaultRequestQuota = Int.MaxValue
 
   var leaderNode: KafkaServer = null
   var followerNode: KafkaServer = null
@@ -99,8 +104,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
     assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -114,8 +119,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   @Test
   def testQuotaOverrideDelete() {
     // Override producer and consumer quotas to unlimited
-    overrideQuotas(Long.MaxValue, Long.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
     assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -136,6 +141,28 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
   }
 
+  @Test
+  def testThrottledRequest() {
+
+    overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+
+    val consumer = consumers.head
+    consumer.subscribe(Collections.singleton(topic1))
+    val endTimeMs = System.currentTimeMillis + 10000
+    var throttled = false
+    while (!throttled && System.currentTimeMillis < endTimeMs) {
+      consumer.poll(100)
+      val throttleMetric = consumerRequestThrottleMetric
+      throttled = throttleMetric != null && throttleMetric.value > 0
+    }
+
+    assertTrue("Should have been throttled", throttled)
+
+    assertNotNull("Exempt requests not recorded", exemptRequestMetric)
+    assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+  }
+
   def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
     var numProduced = 0
     var throttled = false
@@ -169,31 +196,47 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     numConsumed
   }
 
-  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     TestUtils.retry(10000) {
       val quotaManagers = leaderNode.apis.quotas
       val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
       val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
+      val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
+      val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
 
       assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
       assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
     }
   }
 
-  private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = {
+  private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
     leaderNode.metrics.metricName("throttle-time",
-                                  apiKey.name,
+                                  quotaType.toString,
                                   "Tracking throttle-time per user/client-id",
                                   "user", quotaId.sanitizedUser.getOrElse(""),
                                   "client-id", quotaId.clientId.getOrElse(""))
   }
-  private def producerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId))
-  private def consumerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.FETCH, consumerQuotaId))
 
-  def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+  def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
+    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
+  }
+
+  private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
+  private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
+  private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
+
+  private def exemptRequestMetric: KafkaMetric = {
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+    leaderNode.metrics.metrics.get(metricName)
+  }
+
+  def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
     val props = new Properties()
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     props
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index d71713f..f8594e1 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -33,14 +33,15 @@ class ClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
     super.setUp()
   }
-
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     val producerProps = new Properties()
     producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+    producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(producerClientId, producerProps)
 
     val consumerProps = new Properties()
     consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(consumerClientId, consumerProps)
   }
   override def removeQuotaOverrides() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 82b109d..333c851 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -39,18 +39,20 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     val producerProps = new Properties()
     producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+    producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(userPrincipal, producerClientId, producerProps)
 
     val consumerProps = new Properties()
     consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+    consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
   }
 


[4/4] kafka git commit: KAFKA-4954; Request handler utilization quotas

Posted by ju...@apache.org.
KAFKA-4954; Request handler utilization quotas

See KIP-124 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas) for details

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2744 from rajinisivaram/KAFKA-4954


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0104b657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0104b657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0104b657

Branch: refs/heads/trunk
Commit: 0104b657a154fb15e716d872a0e6084f9da650bf
Parents: 6185bc0
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon May 1 09:13:31 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon May 1 09:13:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  30 +-
 .../org/apache/kafka/common/metrics/Sensor.java |   9 +-
 .../kafka/common/network/KafkaChannel.java      |  22 +-
 .../apache/kafka/common/network/Selector.java   |  23 +
 .../apache/kafka/common/protocol/ApiKeys.java   |  86 +--
 .../apache/kafka/common/protocol/Protocol.java  | 212 ++++++--
 .../kafka/common/requests/AbstractRequest.java  |   9 +-
 .../kafka/common/requests/AbstractResponse.java |   1 +
 .../common/requests/AddOffsetsToTxnRequest.java |   4 +-
 .../requests/AddOffsetsToTxnResponse.java       |  11 +-
 .../requests/AddPartitionsToTxnRequest.java     |   4 +-
 .../requests/AddPartitionsToTxnResponse.java    |  11 +-
 .../common/requests/ApiVersionsRequest.java     |   8 +-
 .../common/requests/ApiVersionsResponse.java    |  31 +-
 .../requests/ControlledShutdownRequest.java     |   2 +-
 .../common/requests/CreateTopicsRequest.java    |   4 +-
 .../common/requests/CreateTopicsResponse.java   |  14 +
 .../common/requests/DeleteRecordsRequest.java   |   4 +-
 .../common/requests/DeleteRecordsResponse.java  |  12 +-
 .../common/requests/DeleteTopicsRequest.java    |   4 +-
 .../common/requests/DeleteTopicsResponse.java   |  14 +
 .../common/requests/DescribeGroupsRequest.java  |   4 +-
 .../common/requests/DescribeGroupsResponse.java |  20 +-
 .../kafka/common/requests/EndTxnRequest.java    |   4 +-
 .../kafka/common/requests/EndTxnResponse.java   |  11 +-
 .../kafka/common/requests/FetchRequest.java     |   4 +-
 .../common/requests/FindCoordinatorRequest.java |   5 +-
 .../requests/FindCoordinatorResponse.java       |  14 +
 .../kafka/common/requests/HeartbeatRequest.java |   4 +-
 .../common/requests/HeartbeatResponse.java      |  14 +
 .../kafka/common/requests/InitPidRequest.java   |   4 +-
 .../kafka/common/requests/InitPidResponse.java  |  15 +-
 .../kafka/common/requests/JoinGroupRequest.java |  11 +-
 .../common/requests/JoinGroupResponse.java      |  20 +
 .../common/requests/LeaderAndIsrRequest.java    |   2 +-
 .../common/requests/LeaveGroupRequest.java      |   4 +-
 .../common/requests/LeaveGroupResponse.java     |  14 +
 .../common/requests/ListGroupsRequest.java      |   4 +-
 .../common/requests/ListGroupsResponse.java     |  20 +-
 .../common/requests/ListOffsetRequest.java      |   6 +-
 .../common/requests/ListOffsetResponse.java     |  16 +-
 .../kafka/common/requests/MetadataRequest.java  |   4 +-
 .../kafka/common/requests/MetadataResponse.java |  14 +
 .../common/requests/OffsetCommitRequest.java    |   5 +-
 .../common/requests/OffsetCommitResponse.java   |  14 +
 .../common/requests/OffsetFetchRequest.java     |  10 +-
 .../common/requests/OffsetFetchResponse.java    |  22 +-
 .../requests/OffsetsForLeaderEpochRequest.java  |   2 +-
 .../kafka/common/requests/ProduceRequest.java   |   4 +-
 .../kafka/common/requests/ProduceResponse.java  |   1 -
 .../common/requests/SaslHandshakeRequest.java   |  25 +-
 .../common/requests/StopReplicaRequest.java     |   2 +-
 .../kafka/common/requests/SyncGroupRequest.java |   7 +-
 .../common/requests/SyncGroupResponse.java      |  14 +
 .../common/requests/TxnOffsetCommitRequest.java |   4 +-
 .../requests/TxnOffsetCommitResponse.java       |  11 +-
 .../common/requests/UpdateMetadataRequest.java  |   2 +-
 .../common/requests/WriteTxnMarkersRequest.java |   2 +-
 .../apache/kafka/clients/NetworkClientTest.java |   3 +-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../internals/TransactionManagerTest.java       |  10 +-
 .../common/network/SslTransportLayerTest.java   |  39 ++
 .../kafka/common/protocol/ApiKeysTest.java      |  31 ++
 .../common/requests/RequestResponseTest.java    |  10 +-
 .../scala/kafka/network/RequestChannel.scala    |  82 ++-
 .../main/scala/kafka/network/SocketServer.scala |  31 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 101 ++--
 .../server/ClientRequestQuotaManager.scala      |  54 ++
 .../main/scala/kafka/server/ConfigHandler.scala |   6 +
 .../main/scala/kafka/server/DynamicConfig.scala |   5 +
 .../src/main/scala/kafka/server/KafkaApis.scala | 539 +++++++++++--------
 .../kafka/server/KafkaRequestHandler.scala      |   6 +-
 .../main/scala/kafka/server/QuotaFactory.scala  |  12 +-
 .../integration/kafka/api/AdminClientTest.scala |   2 +-
 .../integration/kafka/api/BaseQuotaTest.scala   |  65 ++-
 .../kafka/api/ClientIdQuotaTest.scala           |   5 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |   8 +-
 .../integration/kafka/api/UserQuotaTest.scala   |   8 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |   6 +-
 .../unit/kafka/server/BaseRequestTest.scala     |   2 +-
 .../kafka/server/ClientQuotaManagerTest.scala   |  62 +++
 .../unit/kafka/server/RequestQuotaTest.scala    | 420 +++++++++++++++
 82 files changed, 1879 insertions(+), 484 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7bd0311..df9e2fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,13 +42,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -100,7 +99,7 @@ public class NetworkClient implements KafkaClient {
 
     private final ApiVersions apiVersions;
 
-    private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
+    private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
 
     private final List<ClientResponse> abortedSends = new LinkedList<>();
 
@@ -471,7 +470,7 @@ public class NetworkClient implements KafkaClient {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
         // Always expect the response version id to be the same as the request version id
         ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
-        Struct responseBody = apiKey.responseSchema(requestHeader.apiVersion()).read(responseBuffer);
+        Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer);
         correlate(requestHeader, responseHeader);
         return AbstractResponse.getResponse(apiKey, responseBody);
     }
@@ -564,10 +563,14 @@ public class NetworkClient implements KafkaClient {
                                            InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
         final String node = req.destination;
         if (apiVersionsResponse.error() != Errors.NONE) {
-            log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
-                    node, apiVersionsResponse.error());
-            this.selector.close(node);
-            processDisconnection(responses, node, now);
+            if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION) {
+                log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
+                        node, apiVersionsResponse.error());
+                this.selector.close(node);
+                processDisconnection(responses, node, now);
+            } else {
+                nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0));
+            }
             return;
         }
         NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
@@ -605,7 +608,7 @@ public class NetworkClient implements KafkaClient {
             // connection.
             if (discoverBrokerVersions) {
                 this.connectionStates.checkingApiVersions(node);
-                nodesNeedingApiVersionsFetch.add(node);
+                nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                 log.debug("Completed connection to node {}.  Fetching API versions.", node);
             } else {
                 this.connectionStates.ready(node);
@@ -615,13 +618,14 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void handleInitiateApiVersionRequests(long now) {
-        Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator();
+        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
         while (iter.hasNext()) {
-            String node = iter.next();
+            Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
+            String node = entry.getKey();
             if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                 log.debug("Initiating API versions fetch from node {}.", node);
-                ApiVersionsRequest.Builder apiVersionRequest = new ApiVersionsRequest.Builder();
-                ClientRequest clientRequest = newClientRequest(node, apiVersionRequest, now, true);
+                ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
+                ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
                 doSend(clientRequest, true, now);
                 iter.remove();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ca9fce..ae331e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -168,16 +168,21 @@ public final class Sensor {
      *         bound
      */
     public void record(double value, long timeMs) {
+        record(value, timeMs, true);
+    }
+
+    public void record(double value, long timeMs, boolean checkQuotas) {
         if (shouldRecord()) {
             this.lastRecordTime = timeMs;
             synchronized (this) {
                 // increment all the stats
                 for (Stat stat : this.stats)
                     stat.record(config, value, timeMs);
-                checkQuotas(timeMs);
+                if (checkQuotas)
+                    checkQuotas(timeMs);
             }
             for (Sensor parent : parents)
-                parent.record(value, timeMs);
+                parent.record(value, timeMs, checkQuotas);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f1bf86c..ea03ff0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -31,6 +31,9 @@ public class KafkaChannel {
     private final String id;
     private final TransportLayer transportLayer;
     private final Authenticator authenticator;
+    // Tracks accumulated network thread time. This is updated on the network thread.
+    // The values are read and reset after each response is sent.
+    private long networkThreadTimeNanos;
     private final int maxReceiveSize;
     private NetworkReceive receive;
     private Send send;
@@ -43,6 +46,7 @@ public class KafkaChannel {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
+        this.networkThreadTimeNanos = 0L;
         this.maxReceiveSize = maxReceiveSize;
         this.disconnected = false;
         this.muted = false;
@@ -164,6 +168,23 @@ public class KafkaChannel {
         return result;
     }
 
+    /**
+     * Accumulates network thread time for this channel.
+     */
+    public void addNetworkThreadTimeNanos(long nanos) {
+        networkThreadTimeNanos += nanos;
+    }
+
+    /**
+     * Returns accumulated network thread time for this channel and resets
+     * the value to zero.
+     */
+    public long getAndResetNetworkThreadTimeNanos() {
+        long current = networkThreadTimeNanos;
+        networkThreadTimeNanos = 0;
+        return current;
+    }
+
     private long receive(NetworkReceive receive) throws IOException {
         return receive.readFrom(transportLayer);
     }
@@ -175,5 +196,4 @@ public class KafkaChannel {
 
         return send.completed();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index fd3ab47..8dd3ad6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -101,6 +101,7 @@ public class Selector implements Selectable {
     private final ChannelBuilder channelBuilder;
     private final int maxReceiveSize;
     private final boolean metricsPerConnection;
+    private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
 
     /**
@@ -122,6 +123,7 @@ public class Selector implements Selectable {
                     String metricGrpPrefix,
                     Map<String, String> metricTags,
                     boolean metricsPerConnection,
+                    boolean recordTimePerConnection,
                     ChannelBuilder channelBuilder) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
@@ -144,9 +146,21 @@ public class Selector implements Selectable {
         this.sensors = new SelectorMetrics(metrics);
         this.channelBuilder = channelBuilder;
         this.metricsPerConnection = metricsPerConnection;
+        this.recordTimePerConnection = recordTimePerConnection;
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
     }
 
+    public Selector(int maxReceiveSize,
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            ChannelBuilder channelBuilder) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+    }
+
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
     }
@@ -326,6 +340,7 @@ public class Selector implements Selectable {
             SelectionKey key = iterator.next();
             iterator.remove();
             KafkaChannel channel = channel(key);
+            long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
 
             // register all per-connection metrics at once
             sensors.maybeRegisterConnectionMetrics(channel.id());
@@ -380,10 +395,18 @@ public class Selector implements Selectable {
                 else
                     log.warn("Unexpected error from {}; closing connection", desc, e);
                 close(channel, true);
+            } finally {
+                maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
         }
     }
 
+    // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
+    private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
+        if (recordTimePerConnection)
+            channel.addNetworkThreadTimeNanos(time.nanoseconds() - startTimeNanos);
+    }
+
     @Override
     public List<Send> completedSends() {
         return this.completedSends;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 9e7ce1d..b98a33e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -25,35 +26,43 @@ import java.nio.ByteBuffer;
  * Identifiers for all the Kafka APIs
  */
 public enum ApiKeys {
-    PRODUCE(0, "Produce"),
-    FETCH(1, "Fetch"),
-    LIST_OFFSETS(2, "Offsets"),
-    METADATA(3, "Metadata"),
-    LEADER_AND_ISR(4, "LeaderAndIsr"),
-    STOP_REPLICA(5, "StopReplica"),
-    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
-    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
-    OFFSET_COMMIT(8, "OffsetCommit"),
-    OFFSET_FETCH(9, "OffsetFetch"),
-    FIND_COORDINATOR(10, "FindCoordinator"),
-    JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat"),
-    LEAVE_GROUP(13, "LeaveGroup"),
-    SYNC_GROUP(14, "SyncGroup"),
-    DESCRIBE_GROUPS(15, "DescribeGroups"),
-    LIST_GROUPS(16, "ListGroups"),
-    SASL_HANDSHAKE(17, "SaslHandshake"),
-    API_VERSIONS(18, "ApiVersions"),
-    CREATE_TOPICS(19, "CreateTopics"),
-    DELETE_TOPICS(20, "DeleteTopics"),
-    DELETE_RECORDS(21, "DeleteRecords"),
-    INIT_PRODUCER_ID(22, "InitProducerId"),
-    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
-    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
-    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
-    END_TXN(26, "EndTxn"),
-    WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
-    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
+    PRODUCE(0, "Produce", false),
+    FETCH(1, "Fetch", false),
+    LIST_OFFSETS(2, "Offsets", false),
+    METADATA(3, "Metadata", false),
+    LEADER_AND_ISR(4, "LeaderAndIsr", true),
+    STOP_REPLICA(5, "StopReplica", true),
+    UPDATE_METADATA_KEY(6, "UpdateMetadata", true),
+    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true),
+    OFFSET_COMMIT(8, "OffsetCommit", false),
+    OFFSET_FETCH(9, "OffsetFetch", false),
+    FIND_COORDINATOR(10, "FindCoordinator", false),
+    JOIN_GROUP(11, "JoinGroup", false),
+    HEARTBEAT(12, "Heartbeat", false),
+    LEAVE_GROUP(13, "LeaveGroup", false),
+    SYNC_GROUP(14, "SyncGroup", false),
+    DESCRIBE_GROUPS(15, "DescribeGroups", false),
+    LIST_GROUPS(16, "ListGroups", false),
+    SASL_HANDSHAKE(17, "SaslHandshake", false),
+    API_VERSIONS(18, "ApiVersions", false) {
+        @Override
+        public Struct parseResponse(short version, ByteBuffer buffer) {
+            // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
+            // using a version higher than that supported by the broker, a version 0 response is sent
+            // to the client indicating UNSUPPORTED_VERSION.
+            return parseResponse(version, buffer, (short) 0);
+        }
+    },
+    CREATE_TOPICS(19, "CreateTopics", false),
+    DELETE_TOPICS(20, "DeleteTopics", false),
+    DELETE_RECORDS(21, "DeleteRecords", false),
+    INIT_PRODUCER_ID(22, "InitProducerId", false),
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true),
+    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false),
+    END_TXN(26, "EndTxn", false),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true),
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -76,11 +85,15 @@ public enum ApiKeys {
     /** an english description of the api--this is for debugging and can change */
     public final String name;
 
-    ApiKeys(int id, String name) {
+    /** indicates if this is a ClusterAction request used only by brokers */
+    public final boolean clusterAction;
+
+    ApiKeys(int id, String name, boolean clusterAction) {
         if (id < 0)
             throw new IllegalArgumentException("id must not be negative, id: " + id);
         this.id = (short) id;
         this.name = name;
+        this.clusterAction = clusterAction;
     }
 
     public static ApiKeys forId(int id) {
@@ -122,6 +135,19 @@ public enum ApiKeys {
         return responseSchema(version).read(buffer);
     }
 
+    protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
+        int bufferPosition = buffer.position();
+        try {
+            return responseSchema(version).read(buffer);
+        } catch (SchemaException e) {
+            if (version != fallbackVersion) {
+                buffer.position(bufferPosition);
+                return responseSchema(fallbackVersion).read(buffer);
+            } else
+                throw e;
+        }
+    }
+
     private Schema schemaFor(Schema[][] schemas, short version) {
         if (id > schemas.length)
             throw new IllegalArgumentException("No schema available for API key " + this);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 54f533e..3da2b3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -65,6 +65,9 @@ public class Protocol {
     /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
     public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
 
+    /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
+    public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
+
     public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
                                                    new Field("port", INT32,
@@ -129,9 +132,19 @@ public class Protocol {
                                                                      "The broker id of the controller broker."),
                                                                  new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
 
+    public static final Schema METADATA_RESPONSE_V3 = new Schema(
+         newThrottleTimeField(),
+         new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+            "Host and port information for all brokers."),
+         new Field("cluster_id", NULLABLE_STRING,
+             "The cluster id that this broker belongs to."),
+         new Field("controller_id", INT32,
+             "The broker id of the controller broker."),
+         new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
 
-    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2};
+    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
 
     /* Produce api */
 
@@ -190,11 +203,7 @@ public class Protocol {
                                                                                                                                             INT16),
                                                                                                                                   new Field("base_offset",
                                                                                                                                             INT64))))))),
-                                                                new Field("throttle_time_ms",
-                                                                          INT32,
-                                                                          "Duration in milliseconds for which the request was throttled" +
-                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                          0));
+                                                                newThrottleTimeField());
     /**
      * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
      * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
@@ -215,11 +224,7 @@ public class Protocol {
                                                                                                                             "If CreateTime is used for the topic, the timestamp will be -1. " +
                                                                                                                             "If LogAppendTime is used for the topic, the timestamp will be " +
                                                                                                                             "the broker local time when the messages are appended."))))))),
-                                                                new Field("throttle_time_ms",
-                                                                          INT32,
-                                                                          "Duration in milliseconds for which the request was throttled" +
-                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                          0));
+                                                                newThrottleTimeField());
     public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
 
     public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
@@ -316,6 +321,9 @@ public class Protocol {
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
                                                                                "Topics to commit offsets."));
 
+    /* v3 request is same as v2. Throttle time has been added to response */
+    public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
+
     public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                           INT32,
                                                                                           "Topic partition id."),
@@ -329,13 +337,18 @@ public class Protocol {
     public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
                                                                                 new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
 
     /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
     public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
     public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
 
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
+    public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                       new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
 
     /* Offset fetch api */
 
@@ -401,8 +414,17 @@ public class Protocol {
                                                                      new Field("error_code",
                                                                                INT16));
 
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2};
+    /* v3 request is the same as v2. Throttle time has been added to v3 response */
+    public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
+    public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                    new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+            new Field("error_code",
+                    INT16));
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
 
     /* List offset api */
     public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -445,6 +467,9 @@ public class Protocol {
                                                                              new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
                                                                              "Topics to list offsets."));
 
+    /* v2 request is the same as v1. Throttle time has been added to response */
+    public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1;
+
     public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                         INT32,
                                                                                         "Topic partition id."),
@@ -477,9 +502,13 @@ public class Protocol {
 
     public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses",
                                                                               new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+    public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                    new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
 
-    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1};
-    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1};
+    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
 
     /* Fetch api */
     public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -630,11 +659,7 @@ public class Protocol {
     public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                         new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
 
-    public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
-                                                                        INT32,
-                                                                        "Duration in milliseconds for which the request was throttled" +
-                                                                            " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                        0),
+    public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(),
                                                               new Field("responses",
                                                                       new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
     // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
@@ -703,19 +728,11 @@ public class Protocol {
             new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
 
     public static final Schema FETCH_RESPONSE_V4 = new Schema(
-            new Field("throttle_time_ms",
-                    INT32,
-                    "Duration in milliseconds for which the request was throttled " +
-                    "due to quota violation (zero if the request did not violate any quota).",
-                    0),
+            newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
 
     public static final Schema FETCH_RESPONSE_V5 = new Schema(
-            new Field("throttle_time_ms",
-                    INT32,
-                    "Duration in milliseconds for which the request was throttled " +
-                    "due to quota violation (zero if the request did not violate any quota).",
-                    0),
+            newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
     public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
@@ -724,19 +741,29 @@ public class Protocol {
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
+
     public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
                                                                           new Field("protocol_type", STRING));
     public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                     new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+    public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
-    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
-    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
 
     /* Describe group api */
     public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
                                                                                  new ArrayOf(STRING),
                                                                                  "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
+
     public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
                                                                                          STRING,
                                                                                          "The memberId assigned by the coordinator"),
@@ -770,9 +797,12 @@ public class Protocol {
                                                                                                  "Current group members (only provided if the group is not Dead)"));
 
     public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
 
-    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
-    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
 
     /* Find coordinator api */
     public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
@@ -802,6 +832,7 @@ public class Protocol {
                     "Host and port information for the coordinator for a consumer group."));
 
     public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code", INT16),
             new Field("error_message", NULLABLE_STRING),
             new Field("coordinator",
@@ -870,6 +901,9 @@ public class Protocol {
                                                                             new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
                                                                             "List of protocols that the member supports"));
 
+    /* v2 request is the same as v1. Throttle time has been added to response */
+    public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
+
     public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
                                                                           new Field("member_metadata", BYTES));
 
@@ -890,9 +924,27 @@ public class Protocol {
                                                                              new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
     public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
-    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1};
-    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1};
+    public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("generation_id",
+                      INT32,
+                      "The generation of the consumer group."),
+            new Field("group_protocol",
+                      STRING,
+                      "The group protocol selected by the coordinator"),
+            new Field("leader_id",
+                      STRING,
+                      "The leader of the group"),
+            new Field("member_id",
+                      STRING,
+                      "The consumer id assigned by the group coordinator."),
+            new Field("members",
+                      new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
+
+
+    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
 
     /* SyncGroup api */
     public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
@@ -901,10 +953,18 @@ public class Protocol {
                                                                   new Field("generation_id", INT32),
                                                                   new Field("member_id", STRING),
                                                                   new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
+
     public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                    new Field("member_assignment", BYTES));
-    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0};
-    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
+    public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("member_assignment", BYTES));
+    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
 
     /* Heartbeat api */
     public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -915,10 +975,16 @@ public class Protocol {
                                                                            STRING,
                                                                            "The member id assigned by the group coordinator."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
+
     public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+    public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16));
 
-    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
-    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
 
     /* Leave group api */
     public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -926,10 +992,16 @@ public class Protocol {
                                                                              STRING,
                                                                              "The member id assigned by the group coordinator."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
+
     public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+    public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16));
 
-    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0};
-    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0};
+    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
 
     /* Leader and ISR api */
     public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
@@ -1082,15 +1154,22 @@ public class Protocol {
     /* ApiVersion api */
     public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
+
     public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."),
                                                            new Field("min_version", INT16, "Minimum supported version."),
                                                            new Field("max_version", INT16, "Maximum supported version."));
 
     public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
                                                                     new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+    public static final Schema API_VERSIONS_RESPONSE_V1 = new Schema(
+            new Field("error_code", INT16, "Error code."),
+            new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
+            newThrottleTimeField());
 
-    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
-    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
+    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
 
     /* Admin requests common */
     public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
@@ -1154,9 +1233,16 @@ public class Protocol {
             new Field("topic_errors",
                     new ArrayOf(TOPIC_ERROR),
                     "An array of per topic errors."));
+    /* v2 request is the same as v1. Throttle time has been added to the response */
+    public static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
+    public static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("topic_errors",
+                    new ArrayOf(TOPIC_ERROR),
+                    "An array of per topic errors."));
 
-    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1};
-    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1};
+    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
 
     /* DeleteTopic api */
     public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
@@ -1171,9 +1257,16 @@ public class Protocol {
         new Field("topic_error_codes",
             new ArrayOf(TOPIC_ERROR_CODE),
             "An array of per topic error codes."));
+    /* v1 request is the same as v0. Throttle time has been added to the response */
+    public static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0;
+    public static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("topic_error_codes",
+                new ArrayOf(TOPIC_ERROR_CODE),
+                "An array of per topic error codes."));
 
-    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
-    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
+    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
 
     public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
                                                                                 new Field("offset", INT64, "The offset before which the messages will be deleted."));
@@ -1191,7 +1284,9 @@ public class Protocol {
     public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
                                                                              new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
 
-    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
+            new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
 
     public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
     public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
@@ -1207,6 +1302,7 @@ public class Protocol {
     );
 
     public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code."),
@@ -1289,6 +1385,7 @@ public class Protocol {
                     "The partitions to add to the transaction.")
     );
     public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1312,6 +1409,7 @@ public class Protocol {
                     "Consumer group id whose offsets should be included in the transaction.")
     );
     public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1336,6 +1434,7 @@ public class Protocol {
     );
 
     public static final Schema END_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1425,6 +1524,7 @@ public class Protocol {
     );
 
     public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("topics",
                     new ArrayOf(new Schema(
                             new Field("topic", STRING),
@@ -1535,6 +1635,12 @@ public class Protocol {
         return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey];
     }
 
+    private static Field newThrottleTimeField() {
+        return new Field("throttle_time_ms", INT32,
+                "Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)",
+                0);
+    }
+
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 07bde63..04f2602 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -95,7 +95,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     /**
      * Get an error response for a request
      */
-    public abstract AbstractResponse getErrorResponse(Throwable e);
+    public AbstractResponse getErrorResponse(Throwable e) {
+        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
+    }
+
+    /**
+     * Get an error response for a request with specified throttle time in the response if applicable
+     */
+    public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e);
 
     /**
      * Factory method for getting a request object based on ApiKey ID and a buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 2286783..b76cb21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractResponse extends AbstractRequestResponse {
+    public static final int DEFAULT_THROTTLE_TIME = 0;
 
     public Send toSend(String destination, RequestHeader requestHeader) {
         return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 4245e82..733e806 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -96,8 +96,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
-        return new AddOffsetsToTxnResponse(Errors.forException(e));
+    public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 2426514..8c41ae4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class AddOffsetsToTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -35,15 +36,22 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public AddOffsetsToTxnResponse(Errors error) {
+    public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public AddOffsetsToTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -51,6 +59,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 9a983d0..5bbea61 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -125,8 +125,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
-        return new AddPartitionsToTxnResponse(Errors.forException(e));
+    public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 0337044..893fcda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class AddPartitionsToTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -35,15 +36,22 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public AddPartitionsToTxnResponse(Errors error) {
+    public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public AddPartitionsToTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -51,6 +59,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 07dd5f5..6f63040 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -30,6 +30,10 @@ public class ApiVersionsRequest extends AbstractRequest {
             super(ApiKeys.API_VERSIONS);
         }
 
+        public Builder(short version) {
+            super(ApiKeys.API_VERSIONS, version);
+        }
+
         @Override
         public ApiVersionsRequest build(short version) {
             return new ApiVersionsRequest(version);
@@ -55,11 +59,13 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+            case 1:
+                return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 382da89..d434c75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -31,7 +31,8 @@ import java.util.Map;
 
 public class ApiVersionsResponse extends AbstractResponse {
 
-    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
+    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
     public static final String API_KEY_NAME = "api_key";
@@ -44,6 +45,7 @@ public class ApiVersionsResponse extends AbstractResponse {
      * UNSUPPORTED_VERSION (33)
      */
     private final Errors error;
+    private final int throttleTimeMs;
     private final Map<Short, ApiVersion> apiKeyToApiVersion;
 
     public static final class ApiVersion {
@@ -72,11 +74,17 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) {
+        this(DEFAULT_THROTTLE_TIME, error, apiVersions);
+    }
+
+    public ApiVersionsResponse(int throttleTimeMs, Errors error, List<ApiVersion> apiVersions) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
     }
 
     public ApiVersionsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         List<ApiVersion> tempApiVersions = new ArrayList<>();
         for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
@@ -92,6 +100,8 @@ public class ApiVersionsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> apiVersionList = new ArrayList<>();
         for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
@@ -105,15 +115,26 @@ public class ApiVersionsResponse extends AbstractResponse {
         return struct;
     }
 
+    public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs) {
+        if (throttleTimeMs == 0 || version == 0)
+            return API_VERSIONS_RESPONSE;
+        else
+            return createApiVersionsResponse(throttleTimeMs);
+    }
+
     /**
      * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we don't support the requested version.
      */
     public static Send unsupportedVersionSend(String destination, RequestHeader requestHeader) {
-        ApiVersionsResponse response = new ApiVersionsResponse(Errors.UNSUPPORTED_VERSION,
+        ApiVersionsResponse response = new ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION,
                 Collections.<ApiVersion>emptyList());
         return response.toSend(destination, (short) 0, requestHeader.toResponseHeader());
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Collection<ApiVersion> apiVersions() {
         return apiKeyToApiVersion.values();
     }
@@ -127,15 +148,15 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
-        return new ApiVersionsResponse(ApiKeys.API_VERSIONS.responseSchema(version).read(buffer));
+        return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
     }
 
-    private static ApiVersionsResponse createApiVersionsResponse() {
+    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
             versionList.add(new ApiVersion(apiKey));
         }
-        return new ApiVersionsResponse(Errors.NONE, versionList);
+        return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
     }
 
     private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 4b5ec13..ee41665 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -62,7 +62,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 072dde8..a0626cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -209,7 +209,7 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
         for (String topic : topics.keySet()) {
             Errors error = Errors.forException(e);
@@ -223,6 +223,8 @@ public class CreateTopicsRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new CreateTopicsResponse(topicErrors);
+            case 2:
+                return new CreateTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 33a4b4a..54f9764 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 public class CreateTopicsResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -82,8 +83,14 @@ public class CreateTopicsResponse extends AbstractResponse {
      */
 
     private final Map<String, Error> errors;
+    private final int throttleTimeMs;
 
     public CreateTopicsResponse(Map<String, Error> errors) {
+        this(DEFAULT_THROTTLE_TIME, errors);
+    }
+
+    public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
@@ -100,12 +107,15 @@ public class CreateTopicsResponse extends AbstractResponse {
             errors.put(topic, new Error(error, errorMessage));
         }
 
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.errors = errors;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, Error> topicError : errors.entrySet()) {
@@ -121,6 +131,10 @@ public class CreateTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, Error> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
index 96f064c..fcd9836 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -119,7 +119,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
 
         for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
@@ -129,7 +129,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new DeleteRecordsResponse(responseMap);
+                return new DeleteRecordsResponse(throttleTimeMs, responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 45b518b..64165eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -33,6 +33,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     public static final long INVALID_LOW_WATERMARK = -1L;
 
     // request level key names
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
@@ -44,6 +45,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    private final int throttleTimeMs;
     private final Map<TopicPartition, PartitionResponse> responses;
 
     /**
@@ -80,6 +82,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     }
 
     public DeleteRecordsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responses = new HashMap<>();
         for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicStruct = (Struct) topicStructObj;
@@ -97,13 +100,16 @@ public class DeleteRecordsResponse extends AbstractResponse {
     /**
      * Constructor for version 0.
      */
-    public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+    public DeleteRecordsResponse(int throttleTimeMs, Map<TopicPartition, PartitionResponse> responses) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responses = responses;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
@@ -125,6 +131,10 @@ public class DeleteRecordsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, PartitionResponse> responses() {
         return this.responses;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index ccbe211..2ea8c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -86,7 +86,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics)
             topicErrors.put(topic, Errors.forException(e));
@@ -94,6 +94,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
         switch (version()) {
             case 0:
                 return new DeleteTopicsResponse(topicErrors);
+            case 1:
+                return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 9d0d0f3..1b80d1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 public class DeleteTopicsResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -40,12 +41,19 @@ public class DeleteTopicsResponse extends AbstractResponse {
      * NOT_CONTROLLER(41)
      */
     private final Map<String, Errors> errors;
+    private final int throttleTimeMs;
 
     public DeleteTopicsResponse(Map<String, Errors> errors) {
+        this(DEFAULT_THROTTLE_TIME, errors);
+    }
+
+    public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
     public DeleteTopicsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
         Map<String, Errors> errors = new HashMap<>();
         for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
@@ -61,6 +69,8 @@ public class DeleteTopicsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
             Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
@@ -72,6 +82,10 @@ public class DeleteTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, Errors> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 287eda9..b43e254 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -73,11 +73,13 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short version = version();
         switch (version) {
             case 0:
                 return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+            case 1:
+                return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 797ed58..bd7a087 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String GROUPS_KEY_NAME = "groups";
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -58,12 +59,19 @@ public class DescribeGroupsResponse extends AbstractResponse {
      */
 
     private final Map<String, GroupMetadata> groups;
+    private final int throttleTimeMs;
 
     public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+        this(DEFAULT_THROTTLE_TIME, groups);
+    }
+
+    public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
+        this.throttleTimeMs = throttleTimeMs;
         this.groups = groups;
     }
 
     public DescribeGroupsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.groups = new HashMap<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
@@ -89,6 +97,10 @@ public class DescribeGroupsResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, GroupMetadata> groups() {
         return groups;
     }
@@ -184,16 +196,22 @@ public class DescribeGroupsResponse extends AbstractResponse {
     }
 
     public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+        return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
+    }
+
+    public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
         GroupMetadata errorMetadata = GroupMetadata.forError(error);
         Map<String, GroupMetadata> groups = new HashMap<>();
         for (String groupId : groupIds)
             groups.put(groupId, errorMetadata);
-        return new DescribeGroupsResponse(groups);
+        return new DescribeGroupsResponse(throttleTimeMs, groups);
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         List<Struct> groupStructs = new ArrayList<>();
         for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index e6eb54e..9c215be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -96,8 +96,8 @@ public class EndTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public EndTxnResponse getErrorResponse(Throwable e) {
-        return new EndTxnResponse(Errors.forException(e));
+    public EndTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new EndTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static EndTxnRequest parse(ByteBuffer buffer, short version) {