You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/30 17:13:57 UTC

[kafka] branch trunk updated: KAFKA-6028: Improve the quota throttle communication (KIP-219)

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1facab3  KAFKA-6028: Improve the quota throttle communication (KIP-219)
1facab3 is described below

commit 1facab387f8c2e513c8b7397430251dc44970e35
Author: Jon Lee <jo...@jonlee-ld1.linkedin.biz>
AuthorDate: Wed Apr 11 17:03:20 2018 -0700

    KAFKA-6028: Improve the quota throttle communication (KIP-219)
    
    This implements KIP-219, where a broker returns a response with throttle time on
    quota violation immediately after processing the corresponding request.  After
    the response is sent out, the broker will keep the channel muted until the
    throttle time is over. Also, on receiving a response with throttle time, client
    will block outgoing communication to the broker for the specified throttle time.
    
    See PR 4830, 5064 and 5094 for all the review history
    
    Author: Jon Lee <jo...@jonlee-ld1.linkedin.biz>
    
    Reviewers: Jun Rao <ju...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>,  Dong Lin <li...@gmail.com>
    
    Closes #5064 from jonlee2/kip-219
---
 .../kafka/clients/ClusterConnectionStates.java     |  81 ++++++++-
 .../org/apache/kafka/clients/ConnectionState.java  |   4 +
 .../java/org/apache/kafka/clients/KafkaClient.java |  19 ++-
 .../org/apache/kafka/clients/NetworkClient.java    |  56 +++++--
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   4 +-
 .../consumer/internals/ConsumerNetworkClient.java  |  19 ++-
 .../producer/internals/RecordAccumulator.java      |  23 ++-
 .../kafka/clients/producer/internals/Sender.java   |  15 +-
 .../apache/kafka/common/network/KafkaChannel.java  | 120 ++++++++++++--
 .../org/apache/kafka/common/network/Selector.java  |   8 +-
 .../kafka/common/requests/AbstractResponse.java    |  13 ++
 .../common/requests/AddOffsetsToTxnRequest.java    |   7 +-
 .../common/requests/AddOffsetsToTxnResponse.java   |  12 +-
 .../common/requests/AddPartitionsToTxnRequest.java |   7 +-
 .../requests/AddPartitionsToTxnResponse.java       |  12 +-
 .../kafka/common/requests/AlterConfigsRequest.java |   8 +-
 .../common/requests/AlterConfigsResponse.java      |  12 +-
 .../requests/AlterReplicaLogDirsRequest.java       |   8 +-
 .../requests/AlterReplicaLogDirsResponse.java      |  13 +-
 .../kafka/common/requests/ApiVersionsRequest.java  |   8 +-
 .../kafka/common/requests/ApiVersionsResponse.java |  12 +-
 .../kafka/common/requests/CreateAclsRequest.java   |   8 +-
 .../kafka/common/requests/CreateAclsResponse.java  |  13 +-
 .../requests/CreateDelegationTokenRequest.java     |   7 +-
 .../requests/CreateDelegationTokenResponse.java    |  13 +-
 .../common/requests/CreatePartitionsRequest.java   |   8 +-
 .../common/requests/CreatePartitionsResponse.java  |  12 +-
 .../kafka/common/requests/CreateTopicsRequest.java |   9 +-
 .../common/requests/CreateTopicsResponse.java      |  14 +-
 .../kafka/common/requests/DeleteAclsRequest.java   |   8 +-
 .../kafka/common/requests/DeleteAclsResponse.java  |  12 +-
 .../kafka/common/requests/DeleteGroupsRequest.java |   8 +-
 .../common/requests/DeleteGroupsResponse.java      |  13 +-
 .../common/requests/DeleteRecordsRequest.java      |   8 +-
 .../common/requests/DeleteRecordsResponse.java     |  13 +-
 .../kafka/common/requests/DeleteTopicsRequest.java |   9 +-
 .../common/requests/DeleteTopicsResponse.java      |  13 +-
 .../kafka/common/requests/DescribeAclsRequest.java |   8 +-
 .../common/requests/DescribeAclsResponse.java      |  13 +-
 .../common/requests/DescribeConfigsRequest.java    |   7 +-
 .../common/requests/DescribeConfigsResponse.java   |  14 +-
 .../requests/DescribeDelegationTokenRequest.java   |   7 +-
 .../requests/DescribeDelegationTokenResponse.java  |  13 +-
 .../common/requests/DescribeGroupsRequest.java     |   8 +-
 .../common/requests/DescribeGroupsResponse.java    |  13 +-
 .../common/requests/DescribeLogDirsRequest.java    |   8 +-
 .../common/requests/DescribeLogDirsResponse.java   |  13 +-
 .../kafka/common/requests/EndTxnRequest.java       |   7 +-
 .../kafka/common/requests/EndTxnResponse.java      |  13 +-
 .../requests/ExpireDelegationTokenRequest.java     |   7 +-
 .../requests/ExpireDelegationTokenResponse.java    |  13 +-
 .../apache/kafka/common/requests/FetchRequest.java |   7 +-
 .../kafka/common/requests/FetchResponse.java       |  12 +-
 .../common/requests/FindCoordinatorRequest.java    |   8 +-
 .../common/requests/FindCoordinatorResponse.java   |  13 +-
 .../kafka/common/requests/HeartbeatRequest.java    |   9 +-
 .../kafka/common/requests/HeartbeatResponse.java   |  14 +-
 .../common/requests/InitProducerIdRequest.java     |   7 +-
 .../common/requests/InitProducerIdResponse.java    |  13 +-
 .../kafka/common/requests/JoinGroupRequest.java    |   9 +-
 .../kafka/common/requests/JoinGroupResponse.java   |  13 +-
 .../kafka/common/requests/LeaveGroupRequest.java   |   9 +-
 .../kafka/common/requests/LeaveGroupResponse.java  |  13 +-
 .../kafka/common/requests/ListGroupsRequest.java   |   9 +-
 .../kafka/common/requests/ListGroupsResponse.java  |  13 +-
 .../kafka/common/requests/ListOffsetRequest.java   |   8 +-
 .../kafka/common/requests/ListOffsetResponse.java  |  14 +-
 .../kafka/common/requests/MetadataRequest.java     |   8 +-
 .../kafka/common/requests/MetadataResponse.java    |  13 +-
 .../kafka/common/requests/OffsetCommitRequest.java |   9 +-
 .../common/requests/OffsetCommitResponse.java      |  12 +-
 .../kafka/common/requests/OffsetFetchRequest.java  |   5 +-
 .../kafka/common/requests/OffsetFetchResponse.java |  13 +-
 .../requests/OffsetsForLeaderEpochResponse.java    |   1 -
 .../kafka/common/requests/ProduceRequest.java      |   8 +-
 .../kafka/common/requests/ProduceResponse.java     |  27 ++-
 .../requests/RenewDelegationTokenRequest.java      |   7 +-
 .../requests/RenewDelegationTokenResponse.java     |  13 +-
 .../common/requests/SaslAuthenticateResponse.java  |   3 +-
 .../common/requests/SaslHandshakeResponse.java     |   3 +-
 .../kafka/common/requests/SyncGroupRequest.java    |   9 +-
 .../kafka/common/requests/SyncGroupResponse.java   |  13 +-
 .../common/requests/TxnOffsetCommitRequest.java    |   7 +-
 .../common/requests/TxnOffsetCommitResponse.java   |  12 +-
 .../common/requests/UpdateMetadataRequest.java     |   2 +-
 .../common/requests/UpdateMetadataResponse.java    |   2 +-
 .../common/requests/WriteTxnMarkersResponse.java   |   1 -
 .../kafka/clients/ClusterConnectionStatesTest.java |  52 ++++--
 .../java/org/apache/kafka/clients/MockClient.java  |   7 +-
 .../apache/kafka/clients/NetworkClientTest.java    | 104 +++++++++++-
 .../clients/consumer/internals/FetcherTest.java    |   7 +-
 .../producer/internals/RecordAccumulatorTest.java  |  33 +++-
 .../clients/producer/internals/SenderTest.java     |  11 +-
 .../apache/kafka/common/protocol/ApiKeysTest.java  |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |   8 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   8 +-
 .../main/scala/kafka/network/SocketServer.scala    |  43 ++++-
 .../scala/kafka/server/ClientQuotaManager.scala    | 110 ++++++++-----
 .../kafka/server/ClientRequestQuotaManager.scala   |  21 ++-
 .../src/main/scala/kafka/server/FetchSession.scala | 115 ++++++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 182 +++++++++++++--------
 .../main/scala/kafka/server/ThrottledChannel.scala |  56 +++++++
 .../scala/kafka/server/ThrottledResponse.scala     |  46 ------
 .../kafka/api/LegacyAdminClientTest.scala          |   2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |   6 +
 .../unit/kafka/network/SocketServerTest.scala      |  89 +++++++++-
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  83 ++++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  36 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  62 ++++++-
 ....scala => ThrottledChannelExpirationTest.scala} |  54 +++---
 docs/upgrade.html                                  |   1 +
 113 files changed, 1792 insertions(+), 471 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 7abcf12..c3a2856 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -129,6 +129,49 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Indicate that the connection is throttled until the specified deadline.
+     * @param id the connection to be throttled
+     * @param throttleUntilTimeMs the throttle deadline in milliseconds
+     */
+    public void throttle(String id, long throttleUntilTimeMs) {
+        NodeConnectionState state = nodeState.get(id);
+        // The throttle deadline should never regress.
+        if (state != null && state.throttleUntilTimeMs < throttleUntilTimeMs) {
+            state.throttleUntilTimeMs = throttleUntilTimeMs;
+        }
+    }
+
+    /**
+     * Return the remaining throttling delay in milliseconds if throttling is in progress. Return 0, otherwise.
+     * @param id the connection to check
+     * @param now the current time in ms
+     */
+    public long throttleDelayMs(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
+        if (state != null && state.throttleUntilTimeMs > now) {
+            return state.throttleUntilTimeMs - now;
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Return the number of milliseconds to wait, based on the connection state and the throttle time, before
+     * attempting to send data. If the connection has been established but being throttled, return throttle delay.
+     * Otherwise, return connection delay.
+     * @param id the connection to check
+     * @param now the current time in ms
+     */
+    public long pollDelayMs(String id, long now) {
+        long throttleDelayMs = throttleDelayMs(id, now);
+        if (isConnected(id) && throttleDelayMs > 0) {
+            return throttleDelayMs;
+        } else {
+            return connectionDelay(id, now);
+        }
+    }
+
+    /**
      * Enter the checking_api_versions state for the given node.
      * @param id the connection identifier
      */
@@ -163,27 +206,44 @@ final class ClusterConnectionStates {
     }
 
     /**
-     * Return true if the connection is ready.
+     * Return true if the connection is in the READY state and currently not throttled.
+     *
      * @param id the connection identifier
+     * @param now the current time
      */
-    public boolean isReady(String id) {
-        NodeConnectionState state = nodeState.get(id);
-        return state != null && state.state == ConnectionState.READY;
+    public boolean isReady(String id, long now) {
+        return isReady(nodeState.get(id), now);
+    }
+
+    private boolean isReady(NodeConnectionState state, long now) {
+        return state != null && state.state == ConnectionState.READY && state.throttleUntilTimeMs <= now;
     }
 
     /**
-     * Return true if there is at least one node with connection in ready state and false otherwise.
+     * Return true if there is at least one node with connection in the READY state and not throttled. Returns false
+     * otherwise.
+     *
+     * @param now the current time
      */
-    public boolean hasReadyNodes() {
+    public boolean hasReadyNodes(long now) {
         for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
-            NodeConnectionState state = entry.getValue();
-            if (state != null && state.state == ConnectionState.READY)
+            if (isReady(entry.getValue(), now)) {
                 return true;
+            }
         }
         return false;
     }
 
     /**
+     * Return true if the connection has been established
+     * @param id The id of the node to check
+     */
+    public boolean isConnected(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state.isConnected();
+    }
+
+    /**
      * Return true if the connection has been disconnected
      * @param id The id of the node to check
      */
@@ -272,6 +332,8 @@ final class ClusterConnectionStates {
         long lastConnectAttemptMs;
         long failedAttempts;
         long reconnectBackoffMs;
+        // Connection is being throttled if current time < throttleUntilTimeMs.
+        long throttleUntilTimeMs;
 
         public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs) {
             this.state = state;
@@ -279,10 +341,11 @@ final class ClusterConnectionStates {
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
             this.reconnectBackoffMs = reconnectBackoffMs;
+            this.throttleUntilTimeMs = 0;
         }
 
         public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ")";
+            return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
index 28b43d6..f92c7fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -31,4 +31,8 @@ public enum ConnectionState {
     public boolean isDisconnected() {
         return this == AUTHENTICATION_FAILED || this == DISCONNECTED;
     }
+
+    public boolean isConnected() {
+        return this == CHECKING_API_VERSIONS || this == READY;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 0a9b519..49bf3a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -48,7 +48,7 @@ public interface KafkaClient extends Closeable {
     boolean ready(Node node, long now);
 
     /**
-     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
      * connections.
      *
@@ -59,6 +59,16 @@ public interface KafkaClient extends Closeable {
     long connectionDelay(Node node, long now);
 
     /**
+     * Return the number of milliseconds to wait, based on the connection state and the throttle time, before
+     * attempting to send data. If the connection has been established but being throttled, return throttle delay.
+     * Otherwise, return connection delay.
+     *
+     * @param node the connection to check
+     * @param now the current time in ms
+     */
+    long pollDelayMs(Node node, long now);
+
+    /**
      * Check if the connection of the node has failed, based on the connection state. Such connection failure are
      * usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
      * call, but there are cases where transient failures needs to be caught and re-acted upon.
@@ -145,9 +155,12 @@ public interface KafkaClient extends Closeable {
     boolean hasInFlightRequests(String nodeId);
 
     /**
-     * Return true if there is at least one node with connection in ready state and false otherwise.
+     * Return true if there is at least one node with connection in the READY state and not throttled. Returns false
+     * otherwise.
+     *
+     * @param now the current time
      */
-    boolean hasReadyNodes();
+    boolean hasReadyNodes(long now);
 
     /**
      * Wake up the client if it is currently blocked waiting for I/O
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index fc4745d..26fa467 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -300,6 +300,22 @@ public class NetworkClient implements KafkaClient {
         return connectionStates.connectionDelay(node.idString(), now);
     }
 
+    // Return the remaining throttling delay in milliseconds if throttling is in progress. Return 0, otherwise.
+    // This is for testing.
+    public long throttleDelayMs(Node node, long now) {
+        return connectionStates.throttleDelayMs(node.idString(), now);
+    }
+
+    /**
+     * Return the poll delay in milliseconds based on both connection and throttle delay.
+     * @param node the connection to check
+     * @param now the current time in ms
+     */
+    @Override
+    public long pollDelayMs(Node node, long now) {
+        return connectionStates.pollDelayMs(node.idString(), now);
+    }
+
     /**
      * Check if the connection of the node has failed, based on the connection state. Such connection failure are
      * usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
@@ -336,16 +352,18 @@ public class NetworkClient implements KafkaClient {
     public boolean isReady(Node node, long now) {
         // if we need to update our metadata now declare all requests unready to make metadata requests first
         // priority
-        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
+        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
     }
 
     /**
      * Are we connected and ready and able to send more requests to the given connection?
      *
      * @param node The node
+     * @param now the current timestamp
      */
-    private boolean canSendRequest(String node) {
-        return connectionStates.isReady(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
+    private boolean canSendRequest(String node, long now) {
+        return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
+            inFlightRequests.canSendMore(node);
     }
 
     /**
@@ -373,7 +391,7 @@ public class NetworkClient implements KafkaClient {
             // will be slightly different for some internal requests (for
             // example, ApiVersionsRequests can be sent prior to being in
             // READY state.)
-            if (!canSendRequest(nodeId))
+            if (!canSendRequest(nodeId, now))
                 throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
         }
         AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
@@ -513,8 +531,8 @@ public class NetworkClient implements KafkaClient {
     }
 
     @Override
-    public boolean hasReadyNodes() {
-        return connectionStates.hasReadyNodes();
+    public boolean hasReadyNodes(long now) {
+        return connectionStates.hasReadyNodes(now);
     }
 
     /**
@@ -673,6 +691,24 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * If a response from a node includes a non-zero throttle delay and client-side throttling has been enabled for
+     * the connection to the node, throttle the connection for the specified delay.
+     *
+     * @param response the response
+     * @param apiVersion the API version of the response
+     * @param nodeId the id of the node
+     * @param now The current time
+     */
+    private void maybeThrottle(AbstractResponse response, short apiVersion, String nodeId, long now) {
+        int throttleTimeMs = response.throttleTimeMs();
+        if (throttleTimeMs > 0 && response.shouldClientThrottle(apiVersion)) {
+            connectionStates.throttle(nodeId, now + throttleTimeMs);
+            log.trace("Connection to node {} is throttled for {} ms until timestamp {}", nodeId, throttleTimeMs,
+                      now + throttleTimeMs);
+        }
+    }
+
+    /**
      * Handle any completed receives and update the response list with the responses received.
      *
      * @param responses The list of responses to update
@@ -688,7 +724,9 @@ public class NetworkClient implements KafkaClient {
                 log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                     req.header.apiKey(), req.header.correlationId(), responseStruct);
             }
+            // If the received response includes a throttle delay, throttle the connection.
             AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
+            maybeThrottle(body, req.header.apiVersion(), req.destination, now);
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
             else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
@@ -715,9 +753,7 @@ public class NetworkClient implements KafkaClient {
         NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
         apiVersions.update(node, nodeVersionInfo);
         this.connectionStates.ready(node);
-        if (log.isDebugEnabled()) {
-            log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
-        }
+        log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
     }
 
     /**
@@ -914,7 +950,7 @@ public class NetworkClient implements KafkaClient {
         private long maybeUpdate(long now, Node node) {
             String nodeConnectionId = node.idString();
 
-            if (canSendRequest(nodeConnectionId)) {
+            if (canSendRequest(nodeConnectionId, now)) {
                 this.metadataFetchInProgress = true;
                 MetadataRequest.Builder metadataRequest;
                 if (metadata.needMetadataForAllTopics())
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 793e1da..6b02e11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -872,7 +872,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
                 Node node = entry.getKey();
                 if (!client.ready(node, now)) {
-                    long nodeTimeout = client.connectionDelay(node, now);
+                    long nodeTimeout = client.pollDelayMs(node, now);
                     pollTimeout = Math.min(pollTimeout, nodeTimeout);
                     log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout);
                     continue;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e8c5bc6..93d4081 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -340,9 +340,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
             // When group management is used, metadata wait is already performed for this scenario as
             // coordinator is unknown, hence this check is not required.
-            if (metadata.updateRequested() && !client.hasReadyNodes()) {
+            if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {
                 final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
-                if (!metadataUpdated && !client.hasReadyNodes()) {
+                if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
                     return false;
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 7a9f717..b718d63 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -127,10 +127,10 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    public boolean hasReadyNodes() {
+    public boolean hasReadyNodes(long now) {
         lock.lock();
         try {
-            return client.hasReadyNodes();
+            return client.hasReadyNodes(now);
         } finally {
             lock.unlock();
         }
@@ -252,7 +252,8 @@ public class ConsumerNetworkClient implements Closeable {
             handlePendingDisconnects();
 
             // send all the requests we can send now
-            trySend(now);
+            long pollDelayMs = trySend(now);
+            timeout = Math.min(timeout, pollDelayMs);
 
             // check whether the poll is still needed by the caller. Note that if the expected completion
             // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
@@ -467,22 +468,24 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    private boolean trySend(long now) {
-        // send any requests that can be sent now
-        boolean requestsSent = false;
+    private long trySend(long now) {
+        long pollDelayMs = Long.MAX_VALUE;
 
+        // send any requests that can be sent now
         for (Node node : unsent.nodes()) {
             Iterator<ClientRequest> iterator = unsent.requestIterator(node);
+            if (iterator.hasNext())
+                pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
+
             while (iterator.hasNext()) {
                 ClientRequest request = iterator.next();
                 if (client.ready(node, now)) {
                     client.send(request, now);
                     iterator.remove();
-                    requestsSent = true;
                 }
             }
         }
-        return requestsSent;
+        return pollDelayMs;
     }
 
     public void maybeTriggerWakeup() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ba8c28e..e2b5844 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -81,7 +81,7 @@ public final class RecordAccumulator {
     private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
     private final IncompleteBatches incomplete;
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
-    private final Set<TopicPartition> muted;
+    private final Map<TopicPartition, Long> muted;
     private int drainIndex;
     private final TransactionManager transactionManager;
 
@@ -126,7 +126,7 @@ public final class RecordAccumulator {
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
         this.incomplete = new IncompleteBatches();
-        this.muted = new HashSet<>();
+        this.muted = new HashMap<>();
         this.time = time;
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
@@ -265,6 +265,13 @@ public final class RecordAccumulator {
         return null;
     }
 
+    private boolean isMuted(TopicPartition tp, long now) {
+        boolean result = muted.containsKey(tp) && muted.get(tp) > now;
+        if (!result)
+            muted.remove(tp);
+        return result;
+    }
+
     /**
      * Get a list of batches which have been sitting in the accumulator too long and need to be expired.
      */
@@ -277,7 +284,7 @@ public final class RecordAccumulator {
             // This is to prevent later batches from being expired while an earlier batch is still in progress.
             // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
             // is only active in this case. Otherwise the expiration order is not guaranteed.
-            if (!muted.contains(tp)) {
+            if (!isMuted(tp, now)) {
                 synchronized (dq) {
                     // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
                     ProducerBatch lastBatch = dq.peekLast();
@@ -436,7 +443,7 @@ public final class RecordAccumulator {
                     // This is a partition for which leader is not known, but messages are available to send.
                     // Note that entries are currently not removed from batches when deque is empty.
                     unknownLeaderTopics.add(part.topic());
-                } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
+                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                     ProducerBatch batch = deque.peekFirst();
                     if (batch != null) {
                         long waitedTimeMs = batch.waitedTimeMs(nowMs);
@@ -504,7 +511,7 @@ public final class RecordAccumulator {
                 PartitionInfo part = parts.get(drainIndex);
                 TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                 // Only proceed if the partition has no in-flight batches.
-                if (!muted.contains(tp)) {
+                if (!isMuted(tp, now)) {
                     Deque<ProducerBatch> deque = getDeque(tp);
                     if (deque != null) {
                         synchronized (deque) {
@@ -733,11 +740,11 @@ public final class RecordAccumulator {
     }
 
     public void mutePartition(TopicPartition tp) {
-        muted.add(tp);
+        muted.put(tp, Long.MAX_VALUE);
     }
 
-    public void unmutePartition(TopicPartition tp) {
-        muted.remove(tp);
+    public void unmutePartition(TopicPartition tp, long throttleUntilTimeMs) {
+        muted.put(tp, throttleUntilTimeMs);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6a10826..7c94179 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -265,7 +265,7 @@ public class Sender implements Runnable {
             Node node = iter.next();
             if (!this.client.ready(node, now)) {
                 iter.remove();
-                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
+                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
             }
         }
 
@@ -461,17 +461,18 @@ public class Sender implements Runnable {
      */
     private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
         RequestHeader requestHeader = response.requestHeader();
+        long receivedTimeMs = response.receivedTimeMs();
         int correlationId = requestHeader.correlationId();
         if (response.wasDisconnected()) {
             log.trace("Cancelled request with header {} due to node {} being disconnected",
                     requestHeader, response.destination());
             for (ProducerBatch batch : batches.values())
-                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
+                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
         } else if (response.versionMismatch() != null) {
             log.warn("Cancelled request {} due to a version mismatch with node {}",
                     response, response.destination(), response.versionMismatch());
             for (ProducerBatch batch : batches.values())
-                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
+                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);
         } else {
             log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
             // if we have a response, parse it
@@ -481,13 +482,13 @@ public class Sender implements Runnable {
                     TopicPartition tp = entry.getKey();
                     ProduceResponse.PartitionResponse partResp = entry.getValue();
                     ProducerBatch batch = batches.get(tp);
-                    completeBatch(batch, partResp, correlationId, now);
+                    completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
                 }
                 this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (ProducerBatch batch : batches.values()) {
-                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
+                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
                 }
             }
         }
@@ -502,7 +503,7 @@ public class Sender implements Runnable {
      * @param now The current POSIX timestamp in milliseconds
      */
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
-                               long now) {
+                               long now, long throttleUntilTimeMs) {
         Errors error = response.error;
 
         if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
@@ -576,7 +577,7 @@ public class Sender implements Runnable {
 
         // Unmute the completed partition.
         if (guaranteeMessageOrder)
-            this.accumulator.unmutePartition(batch.topicPartition);
+            this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
     }
 
     private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index e125bbc..1839729 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -28,6 +28,52 @@ import java.nio.channels.SelectionKey;
 import java.util.Objects;
 
 public class KafkaChannel {
+    /**
+     * Mute States for KafkaChannel:
+     * <ul>
+     *   <li> NOT_MUTED: Channel is not muted. This is the default state. </li>
+     *   <li> MUTED: Channel is muted. Channel must be in this state to be unmuted. </li>
+     *   <li> MUTED_AND_RESPONSE_PENDING: (SocketServer only) Channel is muted and SocketServer has not sent a response
+     *                                    back to the client yet (acks != 0) or is currently waiting to receive a
+     *                                    response from the API layer (acks == 0). </li>
+     *   <li> MUTED_AND_THROTTLED: (SocketServer only) Channel is muted and throttling is in progress due to quota
+     *                             violation. </li>
+     *   <li> MUTED_AND_THROTTLED_AND_RESPONSE_PENDING: (SocketServer only) Channel is muted, throttling is in progress,
+     *                                                  and a response is currently pending. </li>
+     * </ul>
+     */
+    public enum ChannelMuteState {
+        NOT_MUTED,
+        MUTED,
+        MUTED_AND_RESPONSE_PENDING,
+        MUTED_AND_THROTTLED,
+        MUTED_AND_THROTTLED_AND_RESPONSE_PENDING
+    };
+
+    /** Socket server events that will change the mute state:
+     * <ul>
+     *   <li> REQUEST_RECEIVED: A request has been received from the client. </li>
+     *   <li> RESPONSE_SENT: A response has been sent out to the client (ack != 0) or SocketServer has heard back from
+     *                       the API layer (acks = 0) </li>
+     *   <li> THROTTLE_STARTED: Throttling started due to quota violation. </li>
+     *   <li> THROTTLE_ENDED: Throttling ended. </li>
+     * </ul>
+     *
+     * Valid transitions on each event are:
+     * <ul>
+     *   <li> REQUEST_RECEIVED: MUTED => MUTED_AND_RESPONSE_PENDING </li>
+     *   <li> RESPONSE_SENT:    MUTED_AND_RESPONSE_PENDING => MUTED, MUTED_AND_THROTTLED_AND_RESPONSE_PENDING => MUTED_AND_THROTTLED </li>
+     *   <li> THROTTLE_STARTED: MUTED_AND_RESPONSE_PENDING => MUTED_AND_THROTTLED_AND_RESPONSE_PENDING </li>
+     *   <li> THROTTLE_ENDED:   MUTED_AND_THROTTLED => MUTED, MUTED_AND_THROTTLED_AND_RESPONSE_PENDING => MUTED_AND_RESPONSE_PENDING </li>
+     * </ul>
+     */
+    public enum ChannelMuteEvent {
+        REQUEST_RECEIVED,
+        RESPONSE_SENT,
+        THROTTLE_STARTED,
+        THROTTLE_ENDED
+    };
+
     private final String id;
     private final TransportLayer transportLayer;
     private final Authenticator authenticator;
@@ -41,7 +87,7 @@ public class KafkaChannel {
     // Track connection and mute state of channels to enable outstanding requests on channels to be
     // processed after the channel is disconnected.
     private boolean disconnected;
-    private boolean muted;
+    private ChannelMuteState muteState;
     private ChannelState state;
 
     public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
@@ -52,7 +98,7 @@ public class KafkaChannel {
         this.maxReceiveSize = maxReceiveSize;
         this.memoryPool = memoryPool;
         this.disconnected = false;
-        this.muted = false;
+        this.muteState = ChannelMuteState.NOT_MUTED;
         this.state = ChannelState.NOT_CONNECTED;
     }
 
@@ -125,22 +171,76 @@ public class KafkaChannel {
      * externally muting a channel should be done via selector to ensure proper state handling
      */
     void mute() {
-        if (!disconnected)
-            transportLayer.removeInterestOps(SelectionKey.OP_READ);
-        muted = true;
+        if (muteState == ChannelMuteState.NOT_MUTED) {
+            if (!disconnected) transportLayer.removeInterestOps(SelectionKey.OP_READ);
+            muteState = ChannelMuteState.MUTED;
+        }
+    }
+
+    /**
+     * Unmute the channel. The channel can be unmuted only if it is in the MUTED state. For other muted states
+     * (MUTED_AND_*), this is a no-op.
+     *
+     * @return Whether or not the channel is in the NOT_MUTED state after the call
+     */
+    boolean maybeUnmute() {
+        if (muteState == ChannelMuteState.MUTED) {
+            if (!disconnected) transportLayer.addInterestOps(SelectionKey.OP_READ);
+            muteState = ChannelMuteState.NOT_MUTED;
+        }
+        return muteState == ChannelMuteState.NOT_MUTED;
+    }
+
+    // Handle the specified channel mute-related event and transition the mute state according to the state machine.
+    public void handleChannelMuteEvent(ChannelMuteEvent event) {
+        boolean stateChanged = false;
+        switch (event) {
+            case REQUEST_RECEIVED:
+                if (muteState == ChannelMuteState.MUTED) {
+                    muteState = ChannelMuteState.MUTED_AND_RESPONSE_PENDING;
+                    stateChanged = true;
+                }
+                break;
+            case RESPONSE_SENT:
+                if (muteState == ChannelMuteState.MUTED_AND_RESPONSE_PENDING) {
+                    muteState = ChannelMuteState.MUTED;
+                    stateChanged = true;
+                }
+                if (muteState == ChannelMuteState.MUTED_AND_THROTTLED_AND_RESPONSE_PENDING) {
+                    muteState = ChannelMuteState.MUTED_AND_THROTTLED;
+                    stateChanged = true;
+                }
+                break;
+            case THROTTLE_STARTED:
+                if (muteState == ChannelMuteState.MUTED_AND_RESPONSE_PENDING) {
+                    muteState = ChannelMuteState.MUTED_AND_THROTTLED_AND_RESPONSE_PENDING;
+                    stateChanged = true;
+                }
+                break;
+            case THROTTLE_ENDED:
+                if (muteState == ChannelMuteState.MUTED_AND_THROTTLED) {
+                    muteState = ChannelMuteState.MUTED;
+                    stateChanged = true;
+                }
+                if (muteState == ChannelMuteState.MUTED_AND_THROTTLED_AND_RESPONSE_PENDING) {
+                    muteState = ChannelMuteState.MUTED_AND_RESPONSE_PENDING;
+                    stateChanged = true;
+                }
+        }
+        if (!stateChanged) {
+            throw new IllegalStateException("Cannot transition from " + muteState.name() + " for " + event.name());
+        }
     }
 
-    void unmute() {
-        if (!disconnected)
-            transportLayer.addInterestOps(SelectionKey.OP_READ);
-        muted = false;
+    public ChannelMuteState muteState() {
+        return muteState;
     }
 
     /**
      * Returns true if this channel has been explicitly muted using {@link KafkaChannel#mute()}
      */
     public boolean isMute() {
-        return muted;
+        return muteState != ChannelMuteState.NOT_MUTED;
     }
 
     public boolean isInMutableState() {
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1b2d1a2..334ca79 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -397,7 +397,7 @@ public class Selector implements Selectable, AutoCloseable {
             log.trace("Broker no longer low on memory - unmuting incoming sockets");
             for (KafkaChannel channel : channels.values()) {
                 if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
-                    channel.unmute();
+                    channel.maybeUnmute();
                 }
             }
             outOfMemory = false;
@@ -610,8 +610,10 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     private void unmute(KafkaChannel channel) {
-        explicitlyMutedChannels.remove(channel);
-        channel.unmute();
+        // Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted.
+        if (channel.maybeUnmute()) {
+            explicitlyMutedChannels.remove(channel);
+        }
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 608f6c3..8d28521 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -162,6 +162,19 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
         }
     }
 
+    /**
+     * Returns whether or not client should throttle upon receiving a response of the specified version with a non-zero
+     * throttle time. Client-side throttling is needed when communicating with a newer version of broker which, on
+     * quota violation, sends out responses before throttling.
+     */
+    public boolean shouldClientThrottle(short version) {
+        return false;
+    }
+
+    public int throttleTimeMs() {
+        return DEFAULT_THROTTLE_TIME;
+    }
+
     public String toString(short version) {
         return toStruct(version).toString();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index f6a1722..6fb9441 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -35,8 +35,13 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
             PRODUCER_EPOCH,
             GROUP_ID);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V1 = ADD_OFFSETS_TO_TXN_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0};
+        return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0, ADD_OFFSETS_TO_TXN_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 69b8ad4..867ca6a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -32,8 +32,13 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             ERROR_CODE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V1 = ADD_OFFSETS_TO_TXN_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+        return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0, ADD_OFFSETS_TO_TXN_RESPONSE_V1};
     }
 
     // Possible error codes:
@@ -59,6 +64,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
         this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -92,4 +98,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
                 ')';
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 0ca32be..4a87289 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -50,8 +50,13 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
                     new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
                     "The partitions to add to the transaction."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 4472449..977bd59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -48,8 +48,13 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
                             PARTITION_ID,
                             ERROR_CODE)))))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V1 = ADD_PARTITIONS_TO_TXN_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+        return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0, ADD_PARTITIONS_TO_TXN_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -86,6 +91,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
         }
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -136,4 +142,8 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
                 ')';
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index 14b39ae..ed93b15 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -60,8 +60,13 @@ public class AlterConfigsRequest extends AbstractRequest {
                     "An array of resources to update with the provided configs."),
             new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ALTER_CONFIGS_REQUEST_V1 = ALTER_CONFIGS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {ALTER_CONFIGS_REQUEST_V0};
+        return new Schema[] {ALTER_CONFIGS_REQUEST_V0, ALTER_CONFIGS_REQUEST_V1};
     }
 
     public static class Config {
@@ -187,6 +192,7 @@ public class AlterConfigsRequest extends AbstractRequest {
         short version = version();
         switch (version) {
             case 0:
+            case 1:
                 ApiError error = ApiError.fromThrowable(e);
                 Map<Resource, ApiError> errors = new HashMap<>(configs.size());
                 for (Resource resource : configs.keySet())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index f292ef6..feb694b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -52,8 +52,13 @@ public class AlterConfigsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ALTER_CONFIGS_RESPONSE_V1 = ALTER_CONFIGS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_CONFIGS_RESPONSE_V0};
+        return new Schema[]{ALTER_CONFIGS_RESPONSE_V0, ALTER_CONFIGS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -87,6 +92,7 @@ public class AlterConfigsResponse extends AbstractResponse {
         return apiErrorCounts(errors);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -112,4 +118,8 @@ public class AlterConfigsResponse extends AbstractResponse {
         return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
index ba21759..0bc7cd0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
@@ -55,8 +55,13 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
                             TOPIC_NAME,
                             new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = ALTER_REPLICA_LOG_DIRS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0};
+        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, ALTER_REPLICA_LOG_DIRS_REQUEST_V1};
     }
 
     private final Map<TopicPartition, String> partitionDirs;
@@ -147,6 +152,7 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
             default:
                 throw new IllegalArgumentException(
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
index f8d1546..c8f6e4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
@@ -54,8 +54,13 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse {
                             PARTITION_ID,
                             ERROR_CODE)))))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V1 = ALTER_REPLICA_LOG_DIRS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0};
+        return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0, ALTER_REPLICA_LOG_DIRS_RESPONSE_V1};
     }
 
     /**
@@ -116,6 +121,7 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -132,4 +138,9 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse {
     public static AlterReplicaLogDirsResponse parse(ByteBuffer buffer, short version) {
         return new AlterReplicaLogDirsResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 22daf6c..347e355 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -30,8 +30,13 @@ public class ApiVersionsRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema API_VERSIONS_REQUEST_V2 = API_VERSIONS_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+        return new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1, API_VERSIONS_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> {
@@ -92,6 +97,7 @@ public class ApiVersionsRequest extends AbstractRequest {
             case 0:
                 return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             case 1:
+            case 2:
                 return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 753db9e..a37b6da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -54,11 +54,16 @@ public class ApiVersionsResponse extends AbstractResponse {
             new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
             THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema API_VERSIONS_RESPONSE_V2 = API_VERSIONS_RESPONSE_V1;
+
     // initialized lazily to avoid circular initialization dependence with ApiKeys
     private static volatile ApiVersionsResponse defaultApiVersionsResponse;
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
+        return new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1, API_VERSIONS_RESPONSE_V2};
     }
 
     /**
@@ -143,6 +148,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         return createApiVersionsResponse(throttleTimeMs, maxMagic);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -192,4 +198,8 @@ public class ApiVersionsResponse extends AbstractResponse {
         return defaultApiVersionsResponse;
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index d281b3b..ef4cba2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -50,8 +50,13 @@ public class CreateAclsRequest extends AbstractRequest {
                     OPERATION,
                     PERMISSION_TYPE))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_ACLS_REQUEST_V1 = CREATE_ACLS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_REQUEST_V0};
+        return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
     }
 
     public static class AclCreation {
@@ -139,6 +144,7 @@ public class CreateAclsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
                 for (int i = 0; i < aclCreations.size(); i++)
                     responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 6c35798..787ad7a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -42,8 +42,13 @@ public class CreateAclsResponse extends AbstractResponse {
                     ERROR_CODE,
                     ERROR_MESSAGE))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_ACLS_RESPONSE_V1 = CREATE_ACLS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_RESPONSE_V0};
+        return new Schema[]{CREATE_ACLS_RESPONSE_V0, CREATE_ACLS_RESPONSE_V1};
     }
 
     public static class AclCreationResponse {
@@ -96,6 +101,7 @@ public class CreateAclsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -115,4 +121,9 @@ public class CreateAclsResponse extends AbstractResponse {
     public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
         return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
index 7c48205..68e480f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
@@ -44,6 +44,11 @@ public class CreateDelegationTokenRequest extends AbstractRequest {
                     "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" +
                         "  will default to a server side config value."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_CREATE_REQUEST_V1 = TOKEN_CREATE_REQUEST_V0;
+
     private final List<KafkaPrincipal> renewers;
     private final long maxLifeTime;
 
@@ -73,7 +78,7 @@ public class CreateDelegationTokenRequest extends AbstractRequest {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{TOKEN_CREATE_REQUEST_V0};
+        return new Schema[]{TOKEN_CREATE_REQUEST_V0, TOKEN_CREATE_REQUEST_V1};
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index c718cf0..8bb6631 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -62,6 +62,11 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
         new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."),
         THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_CREATE_RESPONSE_V1 = TOKEN_CREATE_RESPONSE_V0;
+
     public CreateDelegationTokenResponse(int throttleTimeMs,
                                          Errors error,
                                          KafkaPrincipal owner,
@@ -103,7 +108,7 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_CREATE_RESPONSE_V0};
+        return new Schema[] {TOKEN_CREATE_RESPONSE_V0, TOKEN_CREATE_RESPONSE_V1};
     }
 
     @Override
@@ -158,6 +163,7 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
         return byteArray;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -165,4 +171,9 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 41c5327..5e776bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -60,8 +60,13 @@ public class CreatePartitionsRequest extends AbstractRequest {
             new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN,
                     "If true then validate the request, but don't actually increase the number of partitions."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_PARTITIONS_REQUEST_V1 = CREATE_PARTITIONS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_PARTITIONS_REQUEST_V0};
+        return new Schema[]{CREATE_PARTITIONS_REQUEST_V0, CREATE_PARTITIONS_REQUEST_V1};
     }
 
     // It is an error for duplicate topics to be present in the request,
@@ -201,6 +206,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
index 73a6b56..744cc32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
@@ -50,8 +50,13 @@ public class CreatePartitionsResponse extends AbstractResponse {
                     )), "Per topic results for the create partitions request")
     );
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_PARTITIONS_RESPONSE_V1 = CREATE_PARTITIONS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_PARTITIONS_RESPONSE_V0};
+        return new Schema[]{CREATE_PARTITIONS_RESPONSE_V0, CREATE_PARTITIONS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -100,6 +105,7 @@ public class CreatePartitionsResponse extends AbstractResponse {
         return apiErrorCounts(errors);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -108,4 +114,8 @@ public class CreatePartitionsResponse extends AbstractResponse {
         return new CreatePartitionsResponse(ApiKeys.CREATE_PARTITIONS.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 3a1de51..aa346f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -91,8 +91,14 @@ public class CreateTopicsRequest extends AbstractRequest {
     /* v2 request is the same as v1. Throttle time has been added to the response */
     private static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_TOPICS_REQUEST_V3 = CREATE_TOPICS_REQUEST_V2;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+        return new Schema[]{CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2,
+            CREATE_TOPICS_REQUEST_V3};
     }
 
     public static final class TopicDetails {
@@ -270,6 +276,7 @@ public class CreateTopicsRequest extends AbstractRequest {
             case 1:
                 return new CreateTopicsResponse(topicErrors);
             case 2:
+            case 3:
                 return new CreateTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 1f73eda..b1504b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -58,8 +58,14 @@ public class CreateTopicsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema CREATE_TOPICS_RESPONSE_V3 = CREATE_TOPICS_RESPONSE_V2;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
+        return new Schema[]{CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2,
+            CREATE_TOPICS_RESPONSE_V3};
     }
 
     /**
@@ -118,6 +124,7 @@ public class CreateTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -134,4 +141,9 @@ public class CreateTopicsResponse extends AbstractResponse {
     public static CreateTopicsResponse parse(ByteBuffer buffer, short version) {
         return new CreateTopicsResponse(ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 3;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 2d50ea6..7f53ab5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -51,8 +51,13 @@ public class DeleteAclsRequest extends AbstractRequest {
                     OPERATION,
                     PERMISSION_TYPE))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_ACLS_REQUEST_V1 = DELETE_ACLS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_REQUEST_V0};
+        return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<DeleteAclsRequest> {
@@ -115,6 +120,7 @@ public class DeleteAclsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
                 for (int i = 0; i < filters.size(); i++) {
                     responses.add(new DeleteAclsResponse.AclFilterResponse(
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 7ae25da..58dbb93 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -69,8 +69,13 @@ public class DeleteAclsResponse extends AbstractResponse {
                             ERROR_MESSAGE,
                             new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL), "The matching ACLs")))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_ACLS_RESPONSE_V1 = DELETE_ACLS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_RESPONSE_V0};
+        return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
     }
 
     public static class AclDeletionResult {
@@ -177,6 +182,7 @@ public class DeleteAclsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -201,4 +207,8 @@ public class DeleteAclsResponse extends AbstractResponse {
         return "(responses=" + Utils.join(responses, ",") + ")";
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
index bda6b33..29604a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -39,8 +39,13 @@ public class DeleteGroupsRequest extends AbstractRequest {
     private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema(
             new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_GROUPS_REQUEST_V1 = DELETE_GROUPS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_GROUPS_REQUEST_V0};
+        return new Schema[]{DELETE_GROUPS_REQUEST_V0, DELETE_GROUPS_REQUEST_V1};
     }
 
     private final Set<String> groups;
@@ -99,6 +104,7 @@ public class DeleteGroupsRequest extends AbstractRequest {
 
         switch (version()) {
             case 0:
+            case 1:
                 return new DeleteGroupsResponse(throttleTimeMs, groupErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
index d97bb0d..032f15d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
@@ -44,8 +44,13 @@ public class DeleteGroupsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array of per group error codes."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_GROUPS_RESPONSE_V1 = DELETE_GROUPS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_GROUPS_RESPONSE_V0};
+        return new Schema[]{DELETE_GROUPS_RESPONSE_V0, DELETE_GROUPS_RESPONSE_V1};
     }
 
 
@@ -102,6 +107,7 @@ public class DeleteGroupsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -126,4 +132,9 @@ public class DeleteGroupsResponse extends AbstractResponse {
     public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
index 6467e06..ad3db60 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -64,8 +64,13 @@ public class DeleteRecordsRequest extends AbstractRequest {
             new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)),
             new Field(TIMEOUT_KEY_NAME, INT32, "The maximum time to await a response in ms."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_RECORDS_REQUEST_V1 = DELETE_RECORDS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_RECORDS_REQUEST_V0};
+        return new Schema[]{DELETE_RECORDS_REQUEST_V0, DELETE_RECORDS_REQUEST_V1};
     }
 
     private final int timeout;
@@ -153,6 +158,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new DeleteRecordsResponse(throttleTimeMs, responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 4b45517..0b494ba 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -64,8 +64,13 @@ public class DeleteRecordsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_RECORDS_RESPONSE_V1 = DELETE_RECORDS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_RECORDS_RESPONSE_V0};
+        return new Schema[]{DELETE_RECORDS_RESPONSE_V0, DELETE_RECORDS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -152,6 +157,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -171,4 +177,9 @@ public class DeleteRecordsResponse extends AbstractResponse {
     public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index 4696b50..dbcc25d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -46,8 +46,14 @@ public class DeleteTopicsRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to the response */
     private static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_TOPICS_REQUEST_V2 = DELETE_TOPICS_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+        return new Schema[]{DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1,
+            DELETE_TOPICS_REQUEST_V2};
     }
 
     private final Set<String> topics;
@@ -114,6 +120,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
             case 0:
                 return new DeleteTopicsResponse(topicErrors);
             case 1:
+            case 2:
                 return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index d13e441..db1c434 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -48,8 +48,13 @@ public class DeleteTopicsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(TOPIC_ERROR_CODES_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DELETE_TOPICS_RESPONSE_V2 = DELETE_TOPICS_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
+        return new Schema[]{DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1, DELETE_TOPICS_RESPONSE_V2};
     }
 
 
@@ -102,6 +107,7 @@ public class DeleteTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -118,4 +124,9 @@ public class DeleteTopicsResponse extends AbstractResponse {
     public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 1bacac7..4d6ec60 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -43,8 +43,13 @@ public class DescribeAclsRequest extends AbstractRequest {
             OPERATION,
             PERMISSION_TYPE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_ACLS_REQUEST_V1 = DESCRIBE_ACLS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_ACLS_REQUEST_V0};
+        return new Schema[]{DESCRIBE_ACLS_REQUEST_V0, DESCRIBE_ACLS_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> {
@@ -93,6 +98,7 @@ public class DescribeAclsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable),
                         Collections.<AclBinding>emptySet());
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index a21230b..c705c71 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -63,8 +63,13 @@ public class DescribeAclsResponse extends AbstractResponse {
             ERROR_MESSAGE,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE), "The resources and their associated ACLs."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_ACLS_RESPONSE_V1 = DESCRIBE_ACLS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0};
+        return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0, DESCRIBE_ACLS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -126,6 +131,7 @@ public class DescribeAclsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -146,4 +152,9 @@ public class DescribeAclsResponse extends AbstractResponse {
     public static DescribeAclsResponse parse(ByteBuffer buffer, short version) {
         return new DescribeAclsResponse(ApiKeys.DESCRIBE_ACLS.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 4156338..72bb112 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -54,9 +54,13 @@ public class DescribeConfigsRequest extends AbstractRequest {
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."),
             new Field(INCLUDE_SYNONYMS, BOOLEAN));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = DESCRIBE_CONFIGS_REQUEST_V1;
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1};
+        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder {
@@ -162,6 +166,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
         switch (version) {
             case 0:
             case 1:
+            case 2:
                 ApiError error = ApiError.fromThrowable(e);
                 Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
                 DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index e463618..9ae1b5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -100,8 +100,13 @@ public class DescribeConfigsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_V2 = DESCRIBE_CONFIGS_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1};
+        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1, DESCRIBE_CONFIGS_RESPONSE_V2};
     }
 
     public static class Config {
@@ -290,6 +295,7 @@ public class DescribeConfigsResponse extends AbstractResponse {
         return configs.get(resource);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -340,7 +346,7 @@ public class DescribeConfigsResponse extends AbstractResponse {
                 }
             }
             resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
-            
+
             resourceStructs.add(resourceStruct);
         }
         struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
@@ -351,4 +357,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
         return new DescribeConfigsResponse(ApiKeys.DESCRIBE_CONFIGS.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
index 61c0055..574bbcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
@@ -39,6 +39,11 @@ public class DescribeDelegationTokenRequest extends AbstractRequest {
     public static final Schema TOKEN_DESCRIBE_REQUEST_V0 = new Schema(
         new Field(OWNER_KEY_NAME, ArrayOf.nullable(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), "An array of token owners."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    public static final Schema TOKEN_DESCRIBE_REQUEST_V1 = TOKEN_DESCRIBE_REQUEST_V0;
+
     public static class Builder extends AbstractRequest.Builder<DescribeDelegationTokenRequest> {
         // describe token for the given list of owners, or null if we want to describe all tokens.
         private final List<KafkaPrincipal> owners;
@@ -86,7 +91,7 @@ public class DescribeDelegationTokenRequest extends AbstractRequest {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{TOKEN_DESCRIBE_REQUEST_V0};
+        return new Schema[]{TOKEN_DESCRIBE_REQUEST_V0, TOKEN_DESCRIBE_REQUEST_V1};
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index 7ba270a..c86b141 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -70,6 +70,11 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
         new Field(TOKEN_DETAILS_KEY_NAME, new ArrayOf(TOKEN_DETAILS_V0)),
         THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_DESCRIBE_RESPONSE_V1 = TOKEN_DESCRIBE_RESPONSE_V0;
+
     public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error, List<DelegationToken> tokens) {
         this.throttleTimeMs = throttleTimeMs;
         this.error = error;
@@ -170,9 +175,10 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{TOKEN_DESCRIBE_RESPONSE_V0};
+        return new Schema[]{TOKEN_DESCRIBE_RESPONSE_V0, TOKEN_DESCRIBE_RESPONSE_V1};
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -188,4 +194,9 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 56117da..8ea4a8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -41,8 +41,13 @@ public class DescribeGroupsRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_GROUPS_REQUEST_V2 = DESCRIBE_GROUPS_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+        return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1, DESCRIBE_GROUPS_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
@@ -96,6 +101,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
             case 0:
                 return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
             case 1:
+            case 2:
                 return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
 
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 174b14b..fbdfaa3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -77,8 +77,13 @@ public class DescribeGroupsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_GROUPS_RESPONSE_V2 = DESCRIBE_GROUPS_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
+        return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1, DESCRIBE_GROUPS_RESPONSE_V2};
     }
 
     public static final String UNKNOWN_STATE = "";
@@ -133,6 +138,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
         }
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -285,4 +291,9 @@ public class DescribeGroupsResponse extends AbstractResponse {
     public static DescribeGroupsResponse parse(ByteBuffer buffer, short version) {
         return new DescribeGroupsResponse(ApiKeys.DESCRIBE_GROUPS.parseResponse(version, buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
index 5f35c43..3728991 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -49,8 +49,13 @@ public class DescribeLogDirsRequest extends AbstractRequest {
                     TOPIC_NAME,
                     new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic.")))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V1 = DESCRIBE_LOG_DIRS_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0};
+        return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0, DESCRIBE_LOG_DIRS_REQUEST_V1};
     }
 
     private final Set<TopicPartition> topicPartitions;
@@ -137,6 +142,7 @@ public class DescribeLogDirsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
             default:
                 throw new IllegalArgumentException(
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index 9b68a9e..41c2617 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -77,8 +77,13 @@ public class DescribeLogDirsResponse extends AbstractResponse {
                                             "AlterReplicaLogDirsRequest and will replace the current log of the replica " +
                                             "in the future.")))))))))));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V1 = DESCRIBE_LOG_DIRS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0};
+        return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0, DESCRIBE_LOG_DIRS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -158,6 +163,7 @@ public class DescribeLogDirsResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -230,4 +236,9 @@ public class DescribeLogDirsResponse extends AbstractResponse {
             return builder.toString();
         }
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 9118d6a..0ec22dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -38,8 +38,13 @@ public class EndTxnRequest extends AbstractRequest {
             PRODUCER_EPOCH,
             new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction (0 = ABORT, 1 = COMMIT)"));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema END_TXN_REQUEST_V1 = END_TXN_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{END_TXN_REQUEST_V0};
+        return new Schema[]{END_TXN_REQUEST_V0, END_TXN_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index a0a453d..cf49996 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -32,8 +32,13 @@ public class EndTxnResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             ERROR_CODE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema END_TXN_RESPONSE_V1 = END_TXN_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{END_TXN_RESPONSE_V0};
+        return new Schema[]{END_TXN_RESPONSE_V0, END_TXN_RESPONSE_V1};
     }
 
     // Possible error codes:
@@ -58,6 +63,7 @@ public class EndTxnResponse extends AbstractResponse {
         this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -90,4 +96,9 @@ public class EndTxnResponse extends AbstractResponse {
                 ", throttleTimeMs=" + throttleTimeMs +
                 ')';
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 40f0aad..21edb5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -38,6 +38,11 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
         new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
         new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_EXPIRE_REQUEST_V1 = TOKEN_EXPIRE_REQUEST_V0;
+
     private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
         super(version);
 
@@ -57,7 +62,7 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_EXPIRE_REQUEST_V0};
+        return new Schema[] {TOKEN_EXPIRE_REQUEST_V0, TOKEN_EXPIRE_REQUEST_V1};
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index 1a673bc..9491a35 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -42,6 +42,11 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
         new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
         THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_EXPIRE_RESPONSE_V1 = TOKEN_EXPIRE_RESPONSE_V0;
+
     public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
         this.throttleTimeMs = throttleTimeMs;
         this.error = error;
@@ -63,7 +68,7 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0};
+        return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0, TOKEN_EXPIRE_RESPONSE_V1};
     }
 
     public Errors error() {
@@ -90,6 +95,7 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -97,4 +103,9 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index ad84905..6e25f7c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -175,9 +175,14 @@ public class FetchRequest extends AbstractRequest {
         new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V5), "Topics to fetch in the order provided."),
         new Field(FORGOTTEN_TOPICS_DATA, new ArrayOf(FORGOTTEN_TOPIC_DATA), "Topics to remove from the fetch session."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema FETCH_REQUEST_V8 = FETCH_REQUEST_V7;
+
     public static Schema[] schemaVersions() {
         return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
-            FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7};
+            FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8};
     };
 
     // default values for older versions where a request level limit did not exist
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index d0380f1..103821b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -161,10 +161,15 @@ public class FetchResponse extends AbstractResponse {
         SESSION_ID,
         new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema FETCH_RESPONSE_V8 = FETCH_RESPONSE_V7;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
             FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7};
+            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8};
     }
 
 
@@ -375,6 +380,7 @@ public class FetchResponse extends AbstractResponse {
         return responseData;
     }
 
+    @Override
     public int throttleTimeMs() {
         return this.throttleTimeMs;
     }
@@ -524,4 +530,8 @@ public class FetchResponse extends AbstractResponse {
         return 4 + toStruct(version, 0, Errors.NONE, partIterator, INVALID_SESSION_ID).sizeOf();
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 8;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 9bfc968..8fb71a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -41,8 +41,13 @@ public class FindCoordinatorRequest extends AbstractRequest {
                             "for transactional producers, this is the transactional id)"),
             new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+        return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
@@ -114,6 +119,7 @@ public class FindCoordinatorRequest extends AbstractRequest {
             case 0:
                 return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
             case 1:
+            case 2:
                 return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
 
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index bddc41f..39726da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -56,8 +56,13 @@ public class FindCoordinatorResponse extends AbstractResponse {
             ERROR_MESSAGE,
             new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
+        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2};
     }
 
     /**
@@ -97,6 +102,7 @@ public class FindCoordinatorResponse extends AbstractResponse {
         node = new Node(nodeId, host, port);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -142,4 +148,9 @@ public class FindCoordinatorResponse extends AbstractResponse {
                 ", node=" + node +
                 ')';
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 7d84918..5a70ed8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -36,8 +36,14 @@ public class HeartbeatRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema HEARTBEAT_REQUEST_V2 = HEARTBEAT_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+        return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1,
+            HEARTBEAT_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> {
@@ -94,6 +100,7 @@ public class HeartbeatRequest extends AbstractRequest {
             case 0:
                 return new HeartbeatResponse(Errors.forException(e));
             case 1:
+            case 2:
                 return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index b52a993..efe2ed8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -35,8 +35,14 @@ public class HeartbeatResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             ERROR_CODE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema HEARTBEAT_RESPONSE_V2 = HEARTBEAT_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
+        return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1,
+            HEARTBEAT_RESPONSE_V2};
     }
 
     /**
@@ -66,6 +72,7 @@ public class HeartbeatResponse extends AbstractResponse {
         error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -90,4 +97,9 @@ public class HeartbeatResponse extends AbstractResponse {
     public static HeartbeatResponse parse(ByteBuffer buffer, short version) {
         return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer));
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index 6c659ff..c350599 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -36,8 +36,13 @@ public class InitProducerIdRequest extends AbstractRequest {
             NULLABLE_TRANSACTIONAL_ID,
             new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema INIT_PRODUCER_ID_REQUEST_V1 = INIT_PRODUCER_ID_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0};
+        return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0, INIT_PRODUCER_ID_REQUEST_V1};
     }
 
     private final String transactionalId;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 7a988ca..9a1e0f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -44,8 +44,13 @@ public class InitProducerIdResponse extends AbstractResponse {
             PRODUCER_ID,
             PRODUCER_EPOCH);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema INIT_PRODUCER_ID_RESPONSE_V1 = INIT_PRODUCER_ID_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0};
+        return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0, INIT_PRODUCER_ID_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
@@ -71,6 +76,7 @@ public class InitProducerIdResponse extends AbstractResponse {
         this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -115,4 +121,9 @@ public class InitProducerIdResponse extends AbstractResponse {
                 ", throttleTimeMs=" + throttleTimeMs +
                 ')';
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index a7b62a9..ba009a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -71,8 +71,14 @@ public class JoinGroupRequest extends AbstractRequest {
     /* v2 request is the same as v1. Throttle time has been added to response */
     private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema JOIN_GROUP_REQUEST_V3 = JOIN_GROUP_REQUEST_V2;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+        return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2,
+            JOIN_GROUP_REQUEST_V3};
     }
 
     public static final String UNKNOWN_MEMBER_ID = "";
@@ -202,6 +208,7 @@ public class JoinGroupRequest extends AbstractRequest {
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
                         Collections.<String, ByteBuffer>emptyMap());
             case 2:
+            case 3:
                 return new JoinGroupResponse(
                         throttleTimeMs,
                         Errors.forException(e),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 4bcd6e6..e3efe86 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -68,9 +68,14 @@ public class JoinGroupResponse extends AbstractResponse {
             MEMBER_ID,
             new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema JOIN_GROUP_RESPONSE_V3 = JOIN_GROUP_RESPONSE_V2;
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
+        return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2,
+            JOIN_GROUP_RESPONSE_V3};
     }
 
     public static final String UNKNOWN_PROTOCOL = "";
@@ -139,6 +144,7 @@ public class JoinGroupResponse extends AbstractResponse {
         leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -215,4 +221,9 @@ public class JoinGroupResponse extends AbstractResponse {
             ", members=" + ((members == null) ? "null" :
                 Utils.join(members.keySet(), ",")) + ")";
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 3;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index b0d0ad6..2b4acf8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -34,8 +34,14 @@ public class LeaveGroupRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LEAVE_GROUP_REQUEST_V2 = LEAVE_GROUP_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+        return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1,
+            LEAVE_GROUP_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<LeaveGroupRequest> {
@@ -86,6 +92,7 @@ public class LeaveGroupRequest extends AbstractRequest {
             case 0:
                 return new LeaveGroupResponse(Errors.forException(e));
             case 1:
+            case 2:
                 return new LeaveGroupResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index f8682ec..c2656f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -35,8 +35,14 @@ public class LeaveGroupResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             ERROR_CODE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LEAVE_GROUP_RESPONSE_V2 = LEAVE_GROUP_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
+        return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1,
+            LEAVE_GROUP_RESPONSE_V2};
     }
 
     /**
@@ -65,6 +71,7 @@ public class LeaveGroupResponse extends AbstractResponse {
         this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -90,4 +97,8 @@ public class LeaveGroupResponse extends AbstractResponse {
         return new LeaveGroupResponse(ApiKeys.LEAVE_GROUP.parseResponse(versionId, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index f279b4c..f64f71a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -32,8 +32,14 @@ public class ListGroupsRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LIST_GROUPS_REQUEST_V2 = LIST_GROUPS_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+        return new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1,
+            LIST_GROUPS_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<ListGroupsRequest> {
@@ -67,6 +73,7 @@ public class ListGroupsRequest extends AbstractRequest {
             case 0:
                 return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
             case 1:
+            case 2:
                 return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 9c82ae0..b108803 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -49,8 +49,14 @@ public class ListGroupsResponse extends AbstractResponse {
             ERROR_CODE,
             new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LIST_GROUPS_RESPONSE_V2 = LIST_GROUPS_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
+        return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1,
+            LIST_GROUPS_RESPONSE_V2};
     }
 
     /**
@@ -86,6 +92,7 @@ public class ListGroupsResponse extends AbstractResponse {
         }
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -142,4 +149,8 @@ public class ListGroupsResponse extends AbstractResponse {
         return new ListGroupsResponse(ApiKeys.LIST_GROUPS.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 98f53bd..be094fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -90,9 +90,14 @@ public class ListOffsetRequest extends AbstractRequest {
                     "result, which allows consumers to discard ABORTED transactional records"),
             new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets."));;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2;
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+        return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2,
+            LIST_OFFSET_REQUEST_V3};
     }
 
     private final int replicaId;
@@ -272,6 +277,7 @@ public class ListOffsetRequest extends AbstractRequest {
             case 0:
             case 1:
             case 2:
+            case 3:
                 return new ListOffsetResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 13f2dfb..e719dbb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -90,8 +90,14 @@ public class ListOffsetResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(RESPONSES_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema LIST_OFFSET_RESPONSE_V3 = LIST_OFFSET_RESPONSE_V2;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
+        return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2,
+            LIST_OFFSET_RESPONSE_V3};
     }
 
     public static final class PartitionData {
@@ -183,6 +189,7 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -244,4 +251,9 @@ public class ListOffsetResponse extends AbstractResponse {
             .append(")");
         return bld.toString();
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 3;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 934b0ed..67dbe94 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -62,9 +62,14 @@ public class MetadataRequest extends AbstractRequest {
     /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
     private static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema METADATA_REQUEST_V6 = METADATA_REQUEST_V5;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3,
-            METADATA_REQUEST_V4, METADATA_REQUEST_V5};
+            METADATA_REQUEST_V4, METADATA_REQUEST_V5, METADATA_REQUEST_V6};
     }
 
     public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
@@ -170,6 +175,7 @@ public class MetadataRequest extends AbstractRequest {
             case 3:
             case 4:
             case 5:
+            case 6:
                 return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index de7c8f6..28a412d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -164,9 +164,14 @@ public class MetadataResponse extends AbstractResponse {
             new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."),
             new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema METADATA_RESPONSE_V6 = METADATA_RESPONSE_V5;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3,
-            METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};
+            METADATA_RESPONSE_V4, METADATA_RESPONSE_V5, METADATA_RESPONSE_V6};
     }
 
     private final int throttleTimeMs;
@@ -277,6 +282,7 @@ public class MetadataResponse extends AbstractResponse {
         return null;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -567,4 +573,9 @@ public class MetadataResponse extends AbstractResponse {
         struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray());
         return struct;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 6;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 4686c3b..570c4d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -106,9 +106,14 @@ public class OffsetCommitRequest extends AbstractRequest {
     /* v3 request is same as v2. Throttle time has been added to response */
     private static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
-            OFFSET_COMMIT_REQUEST_V3};
+            OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4};
     }
 
     // default values for the current version
@@ -193,6 +198,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                 case 1:
                 case 2:
                 case 3:
+                case 4:
                     long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
                     return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
                 default:
@@ -309,6 +315,7 @@ public class OffsetCommitRequest extends AbstractRequest {
             case 2:
                 return new OffsetCommitResponse(responseData);
             case 3:
+            case 4:
                 return new OffsetCommitResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b439034..0b0b228 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -80,9 +80,14 @@ public class OffsetCommitResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
-            OFFSET_COMMIT_RESPONSE_V3};
+            OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4};
     }
 
     private final Map<TopicPartition, Errors> responseData;
@@ -137,6 +142,7 @@ public class OffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -154,4 +160,8 @@ public class OffsetCommitResponse extends AbstractResponse {
         return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 4;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 0db1c50..e90c6eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -75,9 +75,11 @@ public class OffsetFetchRequest extends AbstractRequest {
     /* v3 request is the same as v2. Throttle time has been added to v3 response */
     private static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
 
+    private static final Schema OFFSET_FETCH_REQUEST_V4 = OFFSET_FETCH_REQUEST_V3;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2,
-            OFFSET_FETCH_REQUEST_V3};
+            OFFSET_FETCH_REQUEST_V3, OFFSET_FETCH_REQUEST_V4};
     }
 
     public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
@@ -178,6 +180,7 @@ public class OffsetFetchRequest extends AbstractRequest {
             case 2:
                 return new OffsetFetchResponse(error, responsePartitions);
             case 3:
+            case 4:
                 return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index e398442..613695b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -76,9 +76,14 @@ public class OffsetFetchResponse extends AbstractResponse {
             new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
             ERROR_CODE);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema OFFSET_FETCH_RESPONSE_V4 = OFFSET_FETCH_RESPONSE_V3;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2,
-            OFFSET_FETCH_RESPONSE_V3};
+            OFFSET_FETCH_RESPONSE_V3, OFFSET_FETCH_RESPONSE_V4};
     }
 
     public static final long INVALID_OFFSET = -1L;
@@ -179,6 +184,7 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -234,4 +240,9 @@ public class OffsetFetchResponse extends AbstractResponse {
 
         return struct;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 4;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 3da49b5..4da8767 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -54,7 +54,6 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
             new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
                     "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
 
-
     // OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition leader epoch field,
     // which specifies which leader epoch the end offset belongs to
     private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 = new Schema(
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 91e3aeb..67745cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -108,10 +108,14 @@ public class ProduceRequest extends AbstractRequest {
      */
     private static final Schema PRODUCE_REQUEST_V5 = PRODUCE_REQUEST_V4;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5;
 
     public static Schema[] schemaVersions() {
         return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
-            PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5};
+            PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
     }
 
     public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
@@ -325,6 +329,7 @@ public class ProduceRequest extends AbstractRequest {
             case 3:
             case 4:
             case 5:
+            case 6:
                 return new ProduceResponse(responseMap, throttleTimeMs);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -394,6 +399,7 @@ public class ProduceRequest extends AbstractRequest {
             case 3:
             case 4:
             case 5:
+            case 6:
                 return RecordBatch.MAGIC_VALUE_V2;
 
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index afedc9d..467c980 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -139,14 +139,18 @@ public class ProduceResponse extends AbstractResponse {
                             LOG_START_OFFSET_FIELD)))))),
             THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5;
 
     public static Schema[] schemaVersions() {
         return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
-            PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5};
+            PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6};
     }
 
     private final Map<TopicPartition, PartitionResponse> responses;
-    private final int throttleTime;
+    private final int throttleTimeMs;
 
     /**
      * Constructor for Version 0
@@ -159,11 +163,11 @@ public class ProduceResponse extends AbstractResponse {
     /**
      * Constructor for the latest version
      * @param responses Produced data grouped by topic-partition
-     * @param throttleTime Time in milliseconds the response was throttled
+     * @param throttleTimeMs Time in milliseconds the response was throttled
      */
-    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime) {
+    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) {
         this.responses = responses;
-        this.throttleTime = throttleTime;
+        this.throttleTimeMs = throttleTimeMs;
     }
 
     /**
@@ -185,7 +189,7 @@ public class ProduceResponse extends AbstractResponse {
                 responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset));
             }
         }
-        this.throttleTime = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
     }
 
     @Override
@@ -220,7 +224,7 @@ public class ProduceResponse extends AbstractResponse {
             topicDatas.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTime);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         return struct;
     }
@@ -229,8 +233,9 @@ public class ProduceResponse extends AbstractResponse {
         return this.responses;
     }
 
-    public int getThrottleTime() {
-        return this.throttleTime;
+    @Override
+    public int throttleTimeMs() {
+        return this.throttleTimeMs;
     }
 
     @Override
@@ -279,4 +284,8 @@ public class ProduceResponse extends AbstractResponse {
         return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 6;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index a65c705..57c3100 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -38,6 +38,11 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
         new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be renewed."),
         new Field(RENEW_TIME_PERIOD_KEY_NAME, INT64, "Renew time period in milli seconds."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    public static final Schema TOKEN_RENEW_REQUEST_V1 = TOKEN_RENEW_REQUEST_V0;
+
     private RenewDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
         super(version);
 
@@ -57,7 +62,7 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_RENEW_REQUEST_V0};
+        return new Schema[] {TOKEN_RENEW_REQUEST_V0, TOKEN_RENEW_REQUEST_V1};
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 3233f5c..dc961e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -42,6 +42,11 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
             new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
             THROTTLE_TIME_MS);
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TOKEN_RENEW_RESPONSE_V1 = TOKEN_RENEW_RESPONSE_V0;
+
     public RenewDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
         this.throttleTimeMs = throttleTimeMs;
         this.error = error;
@@ -63,7 +68,7 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
     }
 
     public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_RENEW_RESPONSE_V0};
+        return new Schema[] {TOKEN_RENEW_RESPONSE_V0, TOKEN_RENEW_RESPONSE_V1};
     }
 
     @Override
@@ -82,6 +87,7 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -97,4 +103,9 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index e244182..21ce83e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -102,5 +102,4 @@ public class SaslAuthenticateResponse extends AbstractResponse {
     public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) {
         return new SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version, buffer));
     }
-}
-
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index 252c62d..9faa36c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -96,5 +96,4 @@ public class SaslHandshakeResponse extends AbstractResponse {
     public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) {
         return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer));
     }
-}
-
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 14ed262..962bc77 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -51,8 +51,14 @@ public class SyncGroupRequest extends AbstractRequest {
     /* v1 request is the same as v0. Throttle time has been added to response */
     private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema SYNC_GROUP_REQUEST_V2 = SYNC_GROUP_REQUEST_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+        return new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1,
+            SYNC_GROUP_REQUEST_V2};
     }
 
     public static class Builder extends AbstractRequest.Builder<SyncGroupRequest> {
@@ -127,6 +133,7 @@ public class SyncGroupRequest extends AbstractRequest {
                         Errors.forException(e),
                         ByteBuffer.wrap(new byte[]{}));
             case 1:
+            case 2:
                 return new SyncGroupResponse(
                         throttleTimeMs,
                         Errors.forException(e),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index e1e8083..2b2fc6f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -40,8 +40,14 @@ public class SyncGroupResponse extends AbstractResponse {
             ERROR_CODE,
             new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema SYNC_GROUP_RESPONSE_V2 = SYNC_GROUP_RESPONSE_V1;
+
     public static Schema[] schemaVersions() {
-        return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
+        return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1,
+            SYNC_GROUP_RESPONSE_V2};
     }
 
     /**
@@ -79,6 +85,7 @@ public class SyncGroupResponse extends AbstractResponse {
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -109,4 +116,8 @@ public class SyncGroupResponse extends AbstractResponse {
         return new SyncGroupResponse(ApiKeys.SYNC_GROUP.parseResponse(version, buffer));
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 2;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 9787c2d..25245be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -59,8 +59,13 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
                     new Field(PARTITIONS_KEY_NAME, new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
                     "The partitions to write markers for."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TXN_OFFSET_COMMIT_REQUEST_V1 = TXN_OFFSET_COMMIT_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{TXN_OFFSET_COMMIT_REQUEST_V0};
+        return new Schema[]{TXN_OFFSET_COMMIT_REQUEST_V0, TXN_OFFSET_COMMIT_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index ff0f8ce..c34fd40 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -49,8 +49,13 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
                     new Field(PARTITIONS_KEY_NAME, new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0)))),
                     "Errors per partition from writing markers."));
 
+    /**
+     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     */
+    private static final Schema TXN_OFFSET_COMMIT_RESPONSE_V1 = TXN_OFFSET_COMMIT_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{TXN_OFFSET_COMMIT_RESPONSE_V0};
+        return new Schema[]{TXN_OFFSET_COMMIT_RESPONSE_V0, TXN_OFFSET_COMMIT_RESPONSE_V1};
     }
 
     // Possible error codes:
@@ -117,6 +122,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    @Override
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -142,4 +148,8 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
                 ')';
     }
 
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 1;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index da48e9f..a273765 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -435,7 +435,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
             return new UpdateMetadataResponse(Errors.forException(e));
         else
             throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                    versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA.latestVersion()));
+                versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA.latestVersion()));
     }
 
     public int controllerId() {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 4c21cde..bae816d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -72,4 +72,4 @@ public class UpdateMetadataResponse extends AbstractResponse {
         struct.set(ERROR_CODE, error.code());
         return struct;
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index f4bf157..f307760 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -163,5 +163,4 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
     public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
         return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
     }
-
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 03f4d02..37155ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -51,17 +51,17 @@ public class ClusterConnectionStatesTest {
         connectionStates.connecting(nodeId1, time.milliseconds());
         assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING);
         assertTrue(connectionStates.isConnecting(nodeId1));
-        assertFalse(connectionStates.isReady(nodeId1));
+        assertFalse(connectionStates.isReady(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
 
         time.sleep(100);
 
         // Successful connection
         connectionStates.ready(nodeId1);
         assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.READY);
-        assertTrue(connectionStates.isReady(nodeId1));
-        assertTrue(connectionStates.hasReadyNodes());
+        assertTrue(connectionStates.isReady(nodeId1, time.milliseconds()));
+        assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         assertFalse(connectionStates.isConnecting(nodeId1));
         assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), Long.MAX_VALUE);
@@ -74,7 +74,7 @@ public class ClusterConnectionStatesTest {
         assertTrue(connectionStates.isDisconnected(nodeId1));
         assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.isConnecting(nodeId1));
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
         // After disconnecting we expect a backoff value equal to the reconnect.backoff.ms setting (plus minus 20% jitter)
@@ -92,29 +92,29 @@ public class ClusterConnectionStatesTest {
         // Check initial state, allowed to connect to all nodes, but no nodes shown as ready
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
         assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
 
         // Start connecting one node and check that the pool only shows ready nodes after
         // successful connect
         connectionStates.connecting(nodeId2, time.milliseconds());
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId2);
-        assertTrue(connectionStates.hasReadyNodes());
+        assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
 
         // Connect second node and check that both are shown as ready, pool should immediately
         // show ready nodes, since node2 is already connected
         connectionStates.connecting(nodeId1, time.milliseconds());
-        assertTrue(connectionStates.hasReadyNodes());
+        assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId1);
-        assertTrue(connectionStates.hasReadyNodes());
+        assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
 
         time.sleep(12000);
 
         // disconnect nodes and check proper state of pool throughout
         connectionStates.disconnected(nodeId2, time.milliseconds());
-        assertTrue(connectionStates.hasReadyNodes());
+        assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         assertTrue(connectionStates.isBlackedOut(nodeId2, time.milliseconds()));
         assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         time.sleep(connectionStates.connectionDelay(nodeId2, time.milliseconds()));
@@ -122,7 +122,7 @@ public class ClusterConnectionStatesTest {
         connectionStates.disconnected(nodeId1, time.milliseconds() + 1);
         assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.isBlackedOut(nodeId2, time.milliseconds()));
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
     }
 
     @Test
@@ -136,7 +136,7 @@ public class ClusterConnectionStatesTest {
         time.sleep(1000);
         assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.AUTHENTICATION_FAILED);
         assertTrue(connectionStates.authenticationException(nodeId1) instanceof AuthenticationException);
-        assertFalse(connectionStates.hasReadyNodes());
+        assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
         time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
@@ -200,4 +200,30 @@ public class ClusterConnectionStatesTest {
             time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
         }
     }
+
+    @Test
+    public void testThrottled() {
+        connectionStates.connecting(nodeId1, time.milliseconds());
+        time.sleep(1000);
+        connectionStates.ready(nodeId1);
+        time.sleep(10000);
+
+        // Initially not throttled.
+        assertEquals(0, connectionStates.throttleDelayMs(nodeId1, time.milliseconds()));
+
+        // Throttle for 100ms from now.
+        connectionStates.throttle(nodeId1, time.milliseconds() + 100);
+        assertEquals(100, connectionStates.throttleDelayMs(nodeId1, time.milliseconds()));
+
+        // Still throttled after 50ms. The remaining delay is 50ms. The poll delay should be same as throttling delay.
+        time.sleep(50);
+        assertEquals(50, connectionStates.throttleDelayMs(nodeId1, time.milliseconds()));
+        assertEquals(50, connectionStates.pollDelayMs(nodeId1, time.milliseconds()));
+
+        // Not throttled anymore when the deadline is reached. The poll delay should be same as connection delay.
+        time.sleep(50);
+        assertEquals(0, connectionStates.throttleDelayMs(nodeId1, time.milliseconds()));
+        assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()),
+            connectionStates.pollDelayMs(nodeId1, time.milliseconds()));
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 91905c2..7c2fc0b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -122,6 +122,11 @@ public class MockClient implements KafkaClient {
         return 0;
     }
 
+    @Override
+    public long pollDelayMs(Node node, long now) {
+        return connectionDelay(node, now);
+    }
+
     public void blackout(Node node, long duration) {
         blackedOut.put(node, time.milliseconds() + duration);
     }
@@ -478,7 +483,7 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public boolean hasReadyNodes() {
+    public boolean hasReadyNodes(long now) {
         return !ready.isEmpty();
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 77b36df..f83226c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.CommonFields;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -163,8 +165,7 @@ public class NetworkClientTest {
                 request.correlationId(), handler.response.requestHeader().correlationId());
     }
 
-    private void setExpectedApiVersionsResponse() {
-        ApiVersionsResponse response = ApiVersionsResponse.defaultApiVersionsResponse();
+    private void setExpectedApiVersionsResponse(ApiVersionsResponse response) {
         short apiVersionsResponseVersion = response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
         ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
@@ -172,7 +173,7 @@ public class NetworkClientTest {
 
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
-            setExpectedApiVersionsResponse();
+            setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse());
         }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
@@ -201,22 +202,109 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testConnectionThrottling() {
+        // Instrument the test to return a response with a 100ms throttle delay.
+        awaitReady(client, node);
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
+            Collections.<TopicPartition, MemoryRecords>emptyMap());
+        TestCallbackHandler handler = new TestCallbackHandler();
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler);
+        client.send(request, time.milliseconds());
+        client.poll(1, time.milliseconds());
+        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
+        Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
+        resp.set("responses", new Object[0]);
+        resp.set(CommonFields.THROTTLE_TIME_MS, 100);
+        Struct responseHeaderStruct = respHeader.toStruct();
+        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        responseHeaderStruct.writeTo(buffer);
+        resp.writeTo(buffer);
+        buffer.flip();
+        selector.completeReceive(new NetworkReceive(node.idString(), buffer));
+        List<ClientResponse> responses = client.poll(1, time.milliseconds());
+
+        // The connection is not ready due to throttling.
+        assertFalse(client.ready(node, time.milliseconds()));
+        assertEquals(100, client.throttleDelayMs(node, time.milliseconds()));
+
+        // After 50ms, the connection is not ready yet.
+        time.sleep(50);
+        assertFalse(client.ready(node, time.milliseconds()));
+        assertEquals(50, client.throttleDelayMs(node, time.milliseconds()));
+
+        // After another 50ms, the throttling is done and the connection becomes ready again.
+        time.sleep(50);
+        assertTrue(client.ready(node, time.milliseconds()));
+        assertEquals(0, client.throttleDelayMs(node, time.milliseconds()));
+    }
+
+    // Creates expected ApiVersionsResponse from the specified node, where the max protocol version for the specified
+    // key is set to the specified version.
+    private ApiVersionsResponse createExpectedApiVersionsResponse(Node node, ApiKeys key,
+        short apiVersionsMaxProtocolVersion) {
+        List<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<>();
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            if (apiKey == key) {
+                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey.id, (short) 0, apiVersionsMaxProtocolVersion));
+            } else {
+                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
+            }
+        }
+        return new ApiVersionsResponse(0, Errors.NONE, versionList);
+    }
+
+    @Test
+    public void testThrottlingNotEnabledForConnectionToOlderBroker() {
+        // Instrument the test so that the max protocol version for PRODUCE returned from the node is 5 and thus
+        // client-side throttling is not enabled. Also, return a response with a 100ms throttle delay.
+        setExpectedApiVersionsResponse(createExpectedApiVersionsResponse(node, ApiKeys.PRODUCE, (short) 5));
+        while (!client.ready(node, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        selector.clear();
+
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
+            Collections.<TopicPartition, MemoryRecords>emptyMap());
+        TestCallbackHandler handler = new TestCallbackHandler();
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler);
+        client.send(request, time.milliseconds());
+        client.poll(1, time.milliseconds());
+        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
+        Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
+        resp.set("responses", new Object[0]);
+        resp.set(CommonFields.THROTTLE_TIME_MS, 100);
+        Struct responseHeaderStruct = respHeader.toStruct();
+        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        responseHeaderStruct.writeTo(buffer);
+        resp.writeTo(buffer);
+        buffer.flip();
+        selector.completeReceive(new NetworkReceive(node.idString(), buffer));
+        List<ClientResponse> responses = client.poll(1, time.milliseconds());
+
+        // Since client-side throttling is disabled, the connection is ready even though the response indicated a
+        // throttle delay.
+        assertTrue(client.ready(node, time.milliseconds()));
+        assertEquals(0, client.throttleDelayMs(node, time.milliseconds()));
+    }
+
+    @Test
     public void testLeastLoadedNode() {
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
-        
+
         // leastloadednode should be our single node
         Node leastNode = client.leastLoadedNode(time.milliseconds());
         assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
-        
+
         // sleep for longer than reconnect backoff
         time.sleep(reconnectBackoffMsTest);
-        
-        // CLOSE node 
+
+        // CLOSE node
         selector.serverDisconnect(node.idString());
-        
+
         client.poll(1, time.milliseconds());
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
         leastNode = client.leastLoadedNode(time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7157470..76cde8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1404,8 +1404,11 @@ public class FetcherTest {
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
-        while (!client.ready(node, time.milliseconds()))
+        while (!client.ready(node, time.milliseconds())) {
             client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
+        }
         selector.clear();
 
         for (int i = 1; i <= 3; i++) {
@@ -1418,6 +1421,8 @@ public class FetcherTest {
             buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
             selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index d486c10..5f48410 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -537,7 +537,7 @@ public class RecordAccumulatorTest {
         List<ProducerBatch> expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
-        accum.unmutePartition(tp1);
+        accum.unmutePartition(tp1, 0L);
         expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should be expired", 1, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
@@ -551,7 +551,7 @@ public class RecordAccumulatorTest {
         expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size());
 
-        accum.unmutePartition(tp1);
+        accum.unmutePartition(tp1, 0L);
         expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
@@ -577,9 +577,32 @@ public class RecordAccumulatorTest {
         expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
-        accum.unmutePartition(tp1);
+        accum.unmutePartition(tp1, 0L);
         expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());
+
+        // Test that when being throttled muted batches are expired before the throttle time is over.
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
+        time.sleep(lingerMs);
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
+        // Advance the clock to expire the batch.
+        time.sleep(requestTimeout + 1);
+        accum.mutePartition(tp1);
+        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+
+        long throttleTimeMs = 100L;
+        accum.unmutePartition(tp1, time.milliseconds() + throttleTimeMs);
+        // The batch shouldn't be expired yet.
+        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+
+        // Once the throttle time is over, the batch can be expired.
+        time.sleep(throttleTimeMs);
+        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        assertEquals("The batch should be expired", 1, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
     }
 
     @Test
@@ -603,7 +626,7 @@ public class RecordAccumulatorTest {
         assertEquals("No node should be ready", 0, result.readyNodes.size());
 
         // Test ready without muted partition
-        accum.unmutePartition(tp1);
+        accum.unmutePartition(tp1, 0L);
         result = accum.ready(cluster, time.milliseconds());
         assertTrue("The batch should be ready", result.readyNodes.size() > 0);
 
@@ -613,7 +636,7 @@ public class RecordAccumulatorTest {
         assertEquals("No batch should have been drained", 0, drained.get(node1.id()).size());
 
         // Test drain without muted partition.
-        accum.unmutePartition(tp1);
+        accum.unmutePartition(tp1, 0L);
         drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
         assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1ce8e5a..77005b7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -253,8 +253,11 @@ public class SenderTest {
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
-        while (!client.ready(node, time.milliseconds()))
+        while (!client.ready(node, time.milliseconds())) {
             client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
+        }
         selector.clear();
 
         for (int i = 1; i <= 3; i++) {
@@ -268,6 +271,8 @@ public class SenderTest {
             buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(request.correlationId()));
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
             selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -1656,7 +1661,7 @@ public class SenderTest {
         int maxRetries = 10;
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
-        
+
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
@@ -1740,7 +1745,7 @@ public class SenderTest {
         int maxRetries = 10;
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
-        
+
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index a1dd775..fc88e9e 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -47,7 +47,7 @@ public class ApiKeysTest {
      * All valid client responses which may be throttled should have a field named
      * 'throttle_time_ms' to return the throttle time to the client. Exclusions are
      * <ul>
-     *   <li>Cluster actions used only for inter-broker are throttled only if unauthorized
+     *   <li> Cluster actions used only for inter-broker are throttled only if unauthorized
      *   <li> SASL_HANDSHAKE and SASL_AUTHENTICATE are not throttled when used for authentication
      *        when a connection is established. At any other time, this request returns an error
      *        response that may be throttled.
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2422650..ec6c5d5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -433,7 +433,7 @@ public class RequestResponseTest {
         ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
         assertEquals(100, partitionResponse.logStartOffset);
         assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.getThrottleTime());
+        assertEquals(10, v5FromBytes.throttleTimeMs());
         assertEquals(responseData, v5Response.responses());
     }
 
@@ -445,9 +445,9 @@ public class RequestResponseTest {
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10);
         ProduceResponse v2Response = new ProduceResponse(responseData, 10);
-        assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
-        assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
-        assertEquals("Throttle time must be 10", 10, v2Response.getThrottleTime());
+        assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs());
+        assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs());
+        assertEquals("Throttle time must be 10", 10, v2Response.throttleTimeMs());
         assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0),
                 v0Response.toStruct((short) 0).schema());
         assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1),
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index b13d237..9ed6432 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -71,7 +71,9 @@ object ApiVersion {
     // and KafkaStorageException for fetch requests.
     KAFKA_1_1_IV0,
     // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
-    KAFKA_2_0_IV0
+    KAFKA_2_0_IV0,
+    // Introduced ApiVersionsRequest V2 via KIP-219
+    KAFKA_2_0_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -240,3 +242,10 @@ case object KAFKA_2_0_IV0 extends DefaultApiVersion {
   val recordVersion = RecordVersion.V2
   val id: Int = 15
 }
+
+case object KAFKA_2_0_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "2.0"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 16
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index f03bdeb..f45e0ce 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -234,13 +234,15 @@ object RequestChannel extends Logging {
     def processor: Int = request.processor
 
     override def toString =
-      s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction), responseAsString=$responseAsString"
+      s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction, responseAsString=$responseAsString)"
   }
 
   sealed trait ResponseAction
   case object SendAction extends ResponseAction
   case object NoOpAction extends ResponseAction
   case object CloseConnectionAction extends ResponseAction
+  case object StartThrottlingAction extends ResponseAction
+  case object EndThrottlingAction extends ResponseAction
 }
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
@@ -292,6 +294,10 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
           s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
         case CloseConnectionAction =>
           s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
+        case StartThrottlingAction =>
+          s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
+        case EndThrottlingAction =>
+          s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
       }
       trace(message)
     }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 639f799..759396d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
+import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -424,7 +425,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
    */
   private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
-      if(host == null || host.trim.isEmpty)
+      if (host == null || host.trim.isEmpty)
         new InetSocketAddress(port)
       else
         new InetSocketAddress(host, port)
@@ -626,7 +627,11 @@ private[kafka] class Processor(val id: Int,
             // that are sitting in the server's socket buffer
             updateRequestMetrics(curr)
             trace("Socket server received empty response to send, registering for read: " + curr)
-            openOrClosingChannel(channelId).foreach(c => selector.unmute(c.id))
+            // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
+            // it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
+            // throttling delay has already passed by now.
+            handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
+            tryUnmuteChannel(channelId)
           case RequestChannel.SendAction =>
             val responseSend = curr.responseSend.getOrElse(
               throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
@@ -635,6 +640,13 @@ private[kafka] class Processor(val id: Int,
             updateRequestMetrics(curr)
             trace("Closing socket connection actively according to the response code.")
             close(channelId)
+          case RequestChannel.StartThrottlingAction =>
+            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
+          case RequestChannel.EndThrottlingAction =>
+            // Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
+            // the client.
+            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
+            tryUnmuteChannel(channelId)
         }
       } catch {
         case e: Throwable =>
@@ -677,12 +689,14 @@ private[kafka] class Processor(val id: Int,
         openOrClosingChannel(receive.source) match {
           case Some(channel) =>
             val header = RequestHeader.parse(receive.payload)
-            val context = new RequestContext(header, receive.source, channel.socketAddress,
+            val connectionId = receive.source
+            val context = new RequestContext(header, connectionId, channel.socketAddress,
               channel.principal, listenerName, securityProtocol)
             val req = new RequestChannel.Request(processor = id, context = context,
               startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
             requestChannel.sendRequest(req)
-            selector.mute(receive.source)
+            selector.mute(connectionId)
+            handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
           case None =>
             // This should never happen since completed receives are processed immediately after `poll()`
             throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
@@ -703,10 +717,15 @@ private[kafka] class Processor(val id: Int,
           throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
         }
         updateRequestMetrics(resp)
-        selector.unmute(send.destination)
+
+        // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
+        // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
+        // delay has already passed by now.
+        handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
+        tryUnmuteChannel(send.destination)
       } catch {
         case e: Throwable => processChannelException(send.destination,
-            s"Exception while processing completed send to ${send.destination}", e)
+          s"Exception while processing completed send to ${send.destination}", e)
       }
     }
   }
@@ -821,7 +840,17 @@ private[kafka] class Processor(val id: Int,
   // Visible for testing
   // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
   private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] =
-     Option(selector.channel(connectionId)).orElse(Option(selector.closingChannel(connectionId)))
+    Option(selector.channel(connectionId)).orElse(Option(selector.closingChannel(connectionId)))
+
+  // Indicate the specified channel that the specified channel mute-related event has happened so that it can change its
+  // mute state.
+  private def handleChannelMuteEvent(connectionId: String, event: ChannelMuteEvent): Unit = {
+    openOrClosingChannel(connectionId).foreach(c => c.handleChannelMuteEvent(event))
+  }
+
+  private def tryUnmuteChannel(connectionId: String) = {
+    openOrClosingChannel(connectionId).foreach(c => selector.unmute(c.id))
+  }
 
   /* For test usage */
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 0f8690f..73b40d1 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -20,11 +20,13 @@ import java.{lang, util}
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.network.RequestChannel.Session
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel._
 import kafka.server.ClientQuotaManager._
 import kafka.utils.{Logging, ShutdownableThread}
 import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
@@ -169,9 +171,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       else QuotaTypes.ClientIdQuotaEnabled
   }
   private val lock = new ReentrantReadWriteLock()
-  private val delayQueue = new DelayQueue[ThrottledResponse]()
+  private val delayQueue = new DelayQueue[ThrottledChannel]()
   private val sensorAccessor = new SensorAccess(lock, metrics)
-  private[server] val throttledRequestReaper = new ThrottledRequestReaper(delayQueue, threadNamePrefix)
+  private[server] val throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix)
   private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
 
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
@@ -180,23 +182,23 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     "Tracks the size of the delay queue"), new Total())
   start() // Use start method to keep findbugs happy
   private def start() {
-    throttledRequestReaper.start()
+    throttledChannelReaper.start()
   }
 
   /**
-   * Reaper thread that triggers callbacks on all throttled requests
+   * Reaper thread that triggers channel unmute callbacks on all throttled channels
    * @param delayQueue DelayQueue to dequeue from
    */
-  class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse], prefix: String) extends ShutdownableThread(
-    s"${prefix}ThrottledRequestReaper-$quotaType", false) {
+  class ThrottledChannelReaper(delayQueue: DelayQueue[ThrottledChannel], prefix: String) extends ShutdownableThread(
+    s"${prefix}ThrottledChannelReaper-$quotaType", false) {
 
     override def doWork(): Unit = {
-      val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
-      if (response != null) {
+      val throttledChannel: ThrottledChannel = delayQueue.poll(1, TimeUnit.SECONDS)
+      if (throttledChannel != null) {
         // Decrement the size of the delay queue
         delayQueueSensor.record(-1)
-        trace("Response throttled for: " + response.throttleTimeMs + " ms")
-        response.execute()
+        // Notify the socket server that throttling is done for this channel, so that it can try to unmute the channel.
+        throttledChannel.notifyThrottlingDone()
       }
     }
   }
@@ -211,50 +213,80 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas
 
   /**
-   * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
-   * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
-   * Throttle time calculation may be overridden by sub-classes.
-   *
-   * @param session  the session associated with this request
-   * @param clientId clientId that produced/fetched the data
-   * @param value    amount of data in bytes or request processing time as a percentage
-   * @param callback Callback function. This will be triggered immediately if quota is not violated.
-   *                 If there is a quota violation, this callback will be triggered after a delay
-   * @return Number of milliseconds to delay the response in case of Quota violation.
-   *         Zero otherwise
-   */
-  def maybeRecordAndThrottle(session: Session, clientId: String, value: Double, callback: Int => Unit): Int = {
+    * Records that a user/clientId changed produced/consumed bytes being throttled at the specified time. If quota has
+    * been violated, return throttle time in milliseconds. Throttle time calculation may be overridden by sub-classes.
+    * @param request client request
+    * @param value amount of data in bytes or request processing time as a percentage
+    * @param timeMs time to record the value at
+    * @return throttle time in milliseconds
+    */
+  def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, value: Double, timeMs: Long): Int = {
+    maybeRecordAndGetThrottleTimeMs(request.session, request.header.clientId, value, timeMs)
+  }
+
+  def maybeRecordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
+    // Record metrics only if quotas are enabled.
     if (quotasEnabled) {
-      val clientSensors = getOrCreateQuotaSensors(session, clientId)
-      recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+      recordAndGetThrottleTimeMs(session, clientId, value, timeMs)
     } else {
-      // Don't record any metrics if quotas are not enabled at any level
-      val throttleTimeMs = 0
-      callback(throttleTimeMs)
-      throttleTimeMs
+      0
     }
   }
 
-  def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
+  def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
     var throttleTimeMs = 0
+    val clientSensors = getOrCreateQuotaSensors(session, clientId)
     try {
-      clientSensors.quotaSensor.record(value)
-      // trigger the callback immediately if quota is not violated
-      callback(0)
+      clientSensors.quotaSensor.record(value, timeMs)
     } catch {
       case _: QuotaViolationException =>
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
         throttleTimeMs = throttleTime(clientMetric).toInt
-        clientSensors.throttleTimeSensor.record(throttleTimeMs)
-        // If delayed, add the element to the delayQueue
-        delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
-        delayQueueSensor.record()
         debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
     }
     throttleTimeMs
   }
 
+  /** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value
+    * of the same quantity.
+    *
+    * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally,
+    * we would like to compute the throttle time before actually recording the value, but the current Sensor code
+    * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch
+    * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the
+    * overall sum back to the previous value.
+    */
+  def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = {
+    val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
+    clientSensors.quotaSensor.record(value * (-1), timeMs, false)
+  }
+
+  /**
+    * Throttle a client by muting the associated channel for the given throttle time.
+    * @param request client request
+    * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted.
+    * @param channelThrottlingCallback Callback for channel throttling
+    * @return ThrottledChannel object
+    */
+  def throttle(request: RequestChannel.Request, throttleTimeMs: Int,
+               channelThrottlingCallback: (ResponseAction) => Unit) {
+    throttle(request.session, request.header.clientId, throttleTimeMs, channelThrottlingCallback)
+  }
+
+  def throttle(session: Session, clientId: String, throttleTimeMs: Int,
+               channelThrottlingCallback: (ResponseAction) => Unit) {
+    if (throttleTimeMs > 0) {
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
+
+      clientSensors.throttleTimeSensor.record(throttleTimeMs)
+      val throttledChannel = new ThrottledChannel(time, throttleTimeMs, channelThrottlingCallback)
+      delayQueue.add(throttledChannel)
+      delayQueueSensor.record()
+      debug("Channel throttled for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
+    }
+  }
+
   /**
    * Records that a user/clientId changed some metric being throttled without checking for
    * quota violation. The aggregate value will subsequently be used for throttling when the
@@ -502,7 +534,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   def shutdown(): Unit = {
-    throttledRequestReaper.shutdown()
+    throttledChannelReaper.shutdown()
   }
 
   class DefaultQuotaCallback extends ClientQuotaCallback {
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 3078a62..7bab616 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -40,22 +40,25 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
     exemptSensor.record(value)
   }
 
-  def maybeRecordAndThrottle(request: RequestChannel.Request, sendResponseCallback: Int => Unit): Unit = {
+  /**
+    * Records that a user/clientId changed request processing time being throttled. If quota has been violated, return
+    * throttle time in milliseconds. Throttle time calculation may be overridden by sub-classes.
+    * @param request client request
+    * @return Number of milliseconds to throttle in case of quota violation. Zero otherwise
+    */
+  def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = {
     if (request.apiRemoteCompleteTimeNanos == -1) {
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
     }
 
     if (quotasEnabled) {
-      val quotaSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
-      request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
-
-      recordAndThrottleOnQuotaViolation(
-          quotaSensors,
-          nanosToPercentage(request.requestThreadTimeNanos),
-          sendResponseCallback)
+      request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(
+        getOrCreateQuotaSensors(request.session, request.header.clientId), nanosToPercentage(timeNanos)))
+      recordAndGetThrottleTimeMs(request.session, request.header.clientId,
+        nanosToPercentage(request.requestThreadTimeNanos), time.milliseconds())
     } else {
-      sendResponseCallback(0)
+      0
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 0a825f1..3810d90 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -38,6 +38,7 @@ object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
   type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
   type CACHE_MAP = ImplicitLinkedHashSet[CachedPartition]
+  type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData]]
 
   val NUM_INCREMENTAL_FETCH_SESSISONS = "NumIncrementalFetchSessions"
   val NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED = "NumIncrementalFetchPartitionsCached"
@@ -116,15 +117,16 @@ class CachedPartition(val topic: String,
   }
 
   /**
-    * Update this CachedPartition with new request and response data.
+    * Determine whether or not the specified cached partition should be included in the FetchResponse we send back to
+    * the fetcher and update it if requested.
     *
-    * This function should be called while holding the appropriate session
-    * lock.
+    * This function should be called while holding the appropriate session lock.
     *
-    * @return True if this partition should be included in the FetchResponse
-    *         we send back to the fetcher; false if it can be omitted.
+    * @param respData partition data
+    * @param updateResponseData if set to true, update this CachedPartition with new request and response data.
+    * @return True if this partition should be included in the response; false if it can be omitted.
     */
-  def updateResponseData(respData: FetchResponse.PartitionData): Boolean = {
+  def maybeUpdateResponseData(respData: FetchResponse.PartitionData, updateResponseData: Boolean): Boolean = {
     // Check the response data.
     var mustRespond = false
     if ((respData.records != null) && (respData.records.sizeInBytes() > 0)) {
@@ -133,17 +135,20 @@ class CachedPartition(val topic: String,
     }
     if (highWatermark != respData.highWatermark) {
       mustRespond = true
-      highWatermark = respData.highWatermark
+      if (updateResponseData)
+        highWatermark = respData.highWatermark
     }
     if (localLogStartOffset != respData.logStartOffset) {
       mustRespond = true
-      localLogStartOffset = respData.logStartOffset
+      if (updateResponseData)
+        localLogStartOffset = respData.logStartOffset
     }
     if (respData.error.code() != 0) {
       // Partitions with errors are always included in the response.
       // We also set the cached highWatermark to an invalid offset, -1.
       // This ensures that when the error goes away, we re-send the partition.
-      highWatermark = -1
+      if (updateResponseData)
+        highWatermark = -1
       mustRespond = true
     }
     mustRespond
@@ -272,6 +277,12 @@ trait FetchContext extends Logging {
   def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit
 
   /**
+    * Get the response size to be used for quota computation. Since we are returning an empty response in case of
+    * throttling, we are not supposed to update the context until we know that we are not going to throttle.
+    */
+  def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int
+
+  /**
     * Updates the fetch context with new partition information.  Generates response data.
     * The response data may require subsequent down-conversion.
     */
@@ -290,6 +301,10 @@ class SessionErrorContext(val error: Errors,
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {}
 
+  override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
+    FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet().iterator())
+  }
+
   // Because of the fetch session error, we don't know what partitions were supposed to be in this request.
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
     debug(s"Session error fetch context returning $error")
@@ -310,6 +325,10 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
     fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey, entry.getValue))
   }
 
+  override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
+    FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+  }
+
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
     debug(s"Sessionless fetch context returning ${partitionsToLogString(updates.keySet())}")
     new FetchResponse(Errors.NONE, updates, 0, INVALID_SESSION_ID)
@@ -337,6 +356,10 @@ class FullFetchContext(private val time: Time,
     fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey, entry.getValue))
   }
 
+  override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
+    FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+  }
+
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
     def createNewSession(): FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size())
@@ -378,6 +401,58 @@ class IncrementalFetchContext(private val time: Time,
     }
   }
 
+  // Iterator that goes over the given partition map and selects partitions that need to be included in the response.
+  // If updateFetchContextAndRemoveUnselected is set to true, the fetch context will be updated for the selected
+  // partitions and also remove unselected ones as they are encountered.
+  private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
+                                  val updateFetchContextAndRemoveUnselected: Boolean)
+    extends FetchSession.RESP_MAP_ITER {
+    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = null
+
+    override def hasNext: Boolean = {
+      while ((nextElement == null) && iter.hasNext()) {
+        val element = iter.next()
+        val topicPart = element.getKey
+        val respData = element.getValue
+        val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
+        val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected)
+        if (mustRespond) {
+          nextElement = element
+          if (updateFetchContextAndRemoveUnselected) {
+            session.partitionMap.remove(cachedPart)
+            session.partitionMap.mustAdd(cachedPart)
+          }
+        } else {
+          if (updateFetchContextAndRemoveUnselected) {
+            iter.remove()
+          }
+        }
+      }
+      nextElement != null
+    }
+
+    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = {
+      if (!hasNext()) throw new NoSuchElementException()
+      val element = nextElement
+      nextElement = null
+      element
+    }
+
+    override def remove() = throw new UnsupportedOperationException()
+  }
+
+  override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
+    session.synchronized {
+      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+      if (session.epoch != expectedEpoch) {
+        FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet().iterator())
+      } else {
+        // Pass the partition iterator which updates neither the fetch context nor the partition map.
+        FetchResponse.sizeOf(versionId, new PartitionIterator(updates.entrySet().iterator(), false))
+      }
+    }
+  }
+
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
     session.synchronized {
       // Check to make sure that the session epoch didn't change in between
@@ -388,24 +463,10 @@ class IncrementalFetchContext(private val time: Time,
           s"got ${session.epoch}.  Possible duplicate request.")
         new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, 0, session.id)
       } else {
-        // Iterate over the update list.  Prune updates which don't need to be sent.
-        val iter = updates.entrySet().iterator()
-        while (iter.hasNext()) {
-          val entry = iter.next()
-          val topicPart = entry.getKey
-          val respData = entry.getValue
-          val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
-          val mustRespond = cachedPart.updateResponseData(respData)
-          if (mustRespond) {
-            // Move this to the end of the cached partition map.
-            // This is important for ensuring fairness when lots of partitions
-            // have data to return.
-            session.partitionMap.remove(cachedPart)
-            session.partitionMap.mustAdd(cachedPart)
-          } else {
-            // Do not include this partition in the FetchResponse.
-            iter.remove()
-          }
+        // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent
+        val partitionIter = new PartitionIterator(updates.entrySet().iterator(), true)
+        while (partitionIter.hasNext()) {
+          partitionIter.next()
         }
         debug(s"Incremental fetch context with session id ${session.id} returning " +
           s"${partitionsToLogString(updates.keySet())}")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f86026d..abca2f0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,7 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
+import kafka.network.RequestChannel._
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
 import kafka.utils.{CoreUtils, Logging}
@@ -45,7 +45,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, RecordsProcessingStats}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
@@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse}
+import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
@@ -424,38 +425,46 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
-        if (produceRequest.acks == 0) {
-          // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
-          // the request, since no response is expected by the producer, the server will close socket server so that
-          // the producer client will know that some error has happened and will refresh its metadata
-          if (errorInResponse) {
-            val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
-              topicPartition -> status.error.exceptionName
-            }.mkString(", ")
-            info(
-              s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
-                s"from client id ${request.header.clientId} with ack=0\n" +
-                s"Topic and partition to exceptions: $exceptionsSummary"
-            )
-            closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
-          } else {
-            sendNoOpResponseExemptThrottle(request)
-          }
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
+
+      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
+      // have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
+      // that the request quota is not enforced if acks == 0.
+      val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds())
+      val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)
+      val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
+      if (maxThrottleTimeMs > 0) {
+        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
+          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
         } else {
-          sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))
+          quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
         }
       }
 
-      // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeNanos = time.nanoseconds
-
-      quotas.produce.maybeRecordAndThrottle(
-        request.session,
-        request.header.clientId,
-        numBytesAppended,
-        produceResponseCallback)
+      // Send the response immediately. In case of throttling, the channel has already been muted.
+      if (produceRequest.acks == 0) {
+        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+        // the request, since no response is expected by the producer, the server will close socket server so that
+        // the producer client will know that some error has happened and will refresh its metadata
+        if (errorInResponse) {
+          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+            topicPartition -> status.error.exceptionName
+          }.mkString(", ")
+          info(
+            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+              s"from client id ${request.header.clientId} with ack=0\n" +
+              s"Topic and partition to exceptions: $exceptionsSummary"
+          )
+          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
+        } else {
+          // Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
+          // bandwidth quota violation.
+          sendNoOpResponseExemptThrottle(request)
+        }
+      } else {
+        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)))
+      }
     }
 
     def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
@@ -573,52 +582,73 @@ class KafkaApis(val requestChannel: RequestChannel,
           data.logStartOffset, abortedTransactions, data.records))
       }
       erroneous.foreach{case (tp, data) => partitions.put(tp, data)}
-      val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
-
-      // fetch response callback invoked after any throttling
-      def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
-        def createResponse(requestThrottleTimeMs: Int): FetchResponse = {
-          val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
-          unconvertedFetchResponse.responseData().asScala.foreach { case (tp, partitionData) =>
-            if (partitionData.error != Errors.NONE)
-              debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-                s"on partition $tp failed due to ${partitionData.error.exceptionName}")
-            convertedData.put(tp, convertedPartitionData(tp, partitionData))
-          }
-          val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData,
-            bandwidthThrottleTimeMs + requestThrottleTimeMs, unconvertedFetchResponse.sessionId())
-          response.responseData.asScala.foreach { case (topicPartition, data) =>
-            // record the bytes out metrics only when the response is being sent
-            brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
-          }
-          response
-        }
-
-        trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " +
-          s"metadata=${unconvertedFetchResponse.sessionId()}")
-
-        if (fetchRequest.isFromFollower)
-          sendResponseExemptThrottle(request, createResponse(0))
-        else
-          sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs))
-      }
 
       // When this callback is triggered, the remote API call has completed.
       // Record time before any byte-rate throttling.
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
+      var unconvertedFetchResponse: FetchResponse = null
+
+      def createResponse(throttleTimeMs: Int): FetchResponse = {
+        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+        unconvertedFetchResponse.responseData().asScala.foreach { case (tp, partitionData) =>
+          if (partitionData.error != Errors.NONE)
+            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
+              s"on partition $tp failed due to ${partitionData.error.exceptionName}")
+          convertedData.put(tp, convertedPartitionData(tp, partitionData))
+        }
+        val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
+          unconvertedFetchResponse.sessionId())
+        response.responseData.asScala.foreach { case (topicPartition, data) =>
+          // record the bytes out metrics only when the response is being sent
+          brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
+        }
+        response
+      }
+
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
+        unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
         val responseSize = sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
         quotas.leader.record(responseSize)
-        fetchResponseCallback(bandwidthThrottleTimeMs = 0)
+        trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " +
+          s"metadata=${unconvertedFetchResponse.sessionId()}")
+        sendResponseExemptThrottle(request, createResponse(0))
       } else {
         // Fetch size used to determine throttle time is calculated before any down conversions.
         // This may be slightly different from the actual response size. But since down conversions
-        // result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
-        val responseStruct = unconvertedFetchResponse.toStruct(versionId)
-        quotas.fetch.maybeRecordAndThrottle(request.session, clientId, responseStruct.sizeOf,
-          fetchResponseCallback)
+        // result in data being loaded into memory, we should do this only when we are not going to throttle.
+        //
+        // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
+        // quotas have been violated. If both quotas have been violated, use the max throttle time between the two
+        // quotas. When throttled, we unrecord the recorded bandwidth quota value
+        val responseSize = fetchContext.getResponseSize(partitions, versionId)
+        val timeMs = time.milliseconds()
+        val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
+        val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
+
+        val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
+        if (maxThrottleTimeMs > 0) {
+          // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
+          // from the fetch quota because we are going to return an empty response.
+          quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
+          if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
+            quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
+          } else {
+            quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
+          }
+          // If throttling is required, return an empty response.
+          unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData](),
+            maxThrottleTimeMs, INVALID_SESSION_ID)
+        } else {
+          // Get the actual response. This will update the fetch context.
+          unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
+          trace(s"Sending Fetch response with partitions.size=${responseSize}, " +
+            s"metadata=${unconvertedFetchResponse.sessionId()}")
+        }
+
+        // Send the response immediately.
+        sendResponse(request, Some(createResponse(maxThrottleTimeMs)))
       }
     }
 
@@ -1341,7 +1371,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
     sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE,
-        "SaslAuthenticate request received after successful authentication"))
+      "SaslAuthenticate request received after successful authentication"))
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -2225,13 +2255,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendErrorResponseExemptThrottle(request, e)
   }
 
+  // Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
+  // response immediately.
   private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse): Unit = {
-    quotas.request.maybeRecordAndThrottle(request,
-      throttleTimeMs => sendResponse(request, Some(createResponse(throttleTimeMs))))
+    val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
+    quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
+    sendResponse(request, Some(createResponse(throttleTimeMs)))
   }
 
   private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable) {
-    quotas.request.maybeRecordAndThrottle(request, sendErrorOrCloseConnection(request, error))
+    val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
+    quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
+    sendErrorOrCloseConnection(request, error, throttleTimeMs)
   }
 
   private def sendResponseExemptThrottle(request: RequestChannel.Request, response: AbstractResponse): Unit = {
@@ -2241,10 +2276,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
     quotas.request.maybeRecordExempt(request)
-    sendErrorOrCloseConnection(request, error)(throttleMs = 0)
+    sendErrorOrCloseConnection(request, error, 0)
   }
 
-  private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable)(throttleMs: Int): Unit = {
+  private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable, throttleMs: Int): Unit = {
     val requestBody = request.body[AbstractRequest]
     val response = requestBody.getErrorResponse(throttleMs, error)
     if (response == null)
@@ -2262,7 +2297,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // This case is used when the request handler has encountered an error, but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
     requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
-    requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None))
+    sendActionOnlyResponse(request)(CloseConnectionAction)
   }
 
   private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
@@ -2277,8 +2312,11 @@ class KafkaApis(val requestChannel: RequestChannel,
           else None
         requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
       case None =>
-        requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
+        sendActionOnlyResponse(request)(NoOpAction)
     }
   }
 
+  private def sendActionOnlyResponse(request: RequestChannel.Request)(responseAction: ResponseAction): Unit = {
+    requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala
new file mode 100644
index 0000000..74357d5
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala
@@ -0,0 +1,56 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.{Delayed, TimeUnit}
+
+import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
+import kafka.utils.Logging
+import org.apache.kafka.common.utils.Time
+
+
+/**
+  * Represents a request whose response has been delayed.
+  * @param time @Time instance to use
+  * @param throttleTimeMs delay associated with this request
+  * @param channelThrottlingCallback Callback for channel throttling
+  */
+class ThrottledChannel(val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: (ResponseAction) => Unit)
+  extends Delayed with Logging {
+  var endTime = time.milliseconds + throttleTimeMs
+
+  // Notify the socket server that throttling has started for this channel.
+  channelThrottlingCallback(StartThrottlingAction)
+
+  // Notify the socket server that throttling has been done for this channel.
+  def notifyThrottlingDone(): Unit = {
+    trace("Channel throttled for: " + throttleTimeMs + " ms")
+    channelThrottlingCallback(EndThrottlingAction)
+  }
+
+  override def getDelay(unit: TimeUnit): Long = {
+    unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)
+  }
+
+  override def compareTo(d: Delayed): Int = {
+    val other = d.asInstanceOf[ThrottledChannel]
+    if (this.endTime < other.endTime) -1
+    else if (this.endTime > other.endTime) 1
+    else 0
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala
deleted file mode 100644
index 214fa1f..0000000
--- a/core/src/main/scala/kafka/server/ThrottledResponse.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.{TimeUnit, Delayed}
-
-import org.apache.kafka.common.utils.Time
-
-
-/**
- * Represents a request whose response has been delayed.
- * @param time @Time instance to use
- * @param throttleTimeMs delay associated with this request
- * @param callback Callback to trigger after delayTimeMs milliseconds
- */
-private[server] class ThrottledResponse(val time: Time, val throttleTimeMs: Int, callback: Int => Unit) extends Delayed {
-  val endTime = time.milliseconds + throttleTimeMs
-
-  def execute() = callback(throttleTimeMs)
-
-  override def getDelay(unit: TimeUnit): Long = {
-    unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)
-  }
-
-  override def compareTo(d: Delayed): Int = {
-    val other = d.asInstanceOf[ThrottledResponse]
-    if (this.endTime < other.endTime) -1
-    else if (this.endTime > other.endTime) 1
-    else 0
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 17e2bb9..2f6fa01 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -216,7 +216,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
       val hostStr = s"${node.host}:${node.port}"
       assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
       val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(1, brokerVersionInfo.latestUsableVersion(ApiKeys.API_VERSIONS))
+      assertEquals(2, brokerVersionInfo.latestUsableVersion(ApiKeys.API_VERSIONS))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 9aaadf1..32e4085 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -73,6 +73,12 @@ class ApiVersionTest {
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.0"))
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.0-IV0"))
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
+
+    assertEquals(KAFKA_1_1_IV0, ApiVersion("1.1-IV0"))
+
+    assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0"))
+    assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
+    assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9f1d0fe..dfa388b 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -26,13 +26,14 @@ import javax.net.ssl._
 
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
-import kafka.network.RequestChannel.SendAction
+import kafka.network.RequestChannel.{NoOpAction, ResponseAction, SendAction}
 import kafka.security.CredentialProvider
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, ThrottledChannel}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
@@ -41,7 +42,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
 import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
 import org.apache.log4j.Level
-import org.junit.Assert._
+import org.junit.Assert.{assertEquals, _}
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 
@@ -394,6 +395,88 @@ class SocketServerTest extends JUnitSuite {
     request
   }
 
+  // Prepares test setup for throttled channel tests. throttlingDone controls whether or not throttling has completed
+  // in quota manager.
+  def throttledChannelTestSetUp(socket: Socket, serializedBytes: Array[Byte], action: RequestChannel.ResponseAction,
+                                throttlingInProgress: Boolean): RequestChannel.Request = {
+    sendRequest(socket, serializedBytes)
+
+    // Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a
+    // throttled channel.
+    val request = receiveRequest(server.requestChannel)
+    val byteBuffer = request.body[AbstractRequest].serialize(request.header)
+    val send = new NetworkSend(request.context.connectionId, byteBuffer)
+    def channelThrottlingCallback(responseAction: ResponseAction): Unit = {
+      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+    }
+    val throttledChannel = new ThrottledChannel(new MockTime(), 100, channelThrottlingCallback)
+    server.requestChannel.sendResponse(new RequestChannel.Response(request, Some(send), action,
+      Some(request.header.toString)))
+
+    // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
+    // false.
+    if (!throttlingInProgress) throttledChannel.notifyThrottlingDone()
+
+    request
+  }
+
+  def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
+    server.processor(0).openOrClosingChannel(request.context.connectionId)
+
+  @Test
+  def testSendActionResponseWithThrottledChannelWhereThrottlingInProgress() {
+    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes()
+    // SendAction with throttling in progress
+    val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, true)
+
+    // receive response
+    assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
+    TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.MUTED_AND_THROTTLED), "fail")
+    // Channel should still be muted.
+    assertTrue(openOrClosingChannel(request).exists(c => c.isMute()))
+  }
+
+  @Test
+  def testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
+    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes()
+    // SendAction with throttling in progress
+    val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, false)
+
+    // receive response
+    assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
+    // Since throttling is already done, the channel can be unmuted after sending out the response.
+    TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.NOT_MUTED), "fail")
+    // Channel is now unmuted.
+    assertFalse(openOrClosingChannel(request).exists(c => c.isMute()))
+  }
+
+  @Test
+  def testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress() {
+    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes()
+    // SendAction with throttling in progress
+    val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, true)
+
+    TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.MUTED_AND_THROTTLED), "fail")
+    // Channel should still be muted.
+    assertTrue(openOrClosingChannel(request).exists(c => c.isMute()))
+  }
+
+  @Test
+  def testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
+    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes()
+    // SendAction with throttling in progress
+    val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, false)
+
+    // Since throttling is already done, the channel can be unmuted.
+    TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.NOT_MUTED), "fail")
+    // Channel is now unmuted.
+    assertFalse(openOrClosingChannel(request).exists(c => c.isMute()))
+  }
+
   @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index c0bad91..b99bac8 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import java.util.Collections
 
-import kafka.network.RequestChannel.Session
+import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, Session, StartThrottlingAction}
 import kafka.server.QuotaType._
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -32,8 +32,12 @@ class ClientQuotaManagerTest {
   private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
 
   var numCallbacks: Int = 0
-  def callback(delayTimeMs: Int) {
-    numCallbacks += 1
+  def callback (responseAction: ResponseAction) {
+    // Count how many times this callback is called for notifyThrottlingDone().
+    responseAction match {
+      case StartThrottlingAction =>
+      case EndThrottlingAction => numCallbacks += 1
+    }
   }
 
   @Before
@@ -41,9 +45,15 @@ class ClientQuotaManagerTest {
     numCallbacks = 0
   }
 
-  private def maybeRecordAndThrottle(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
+  private def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
-    quotaManager.maybeRecordAndThrottle(Session(principal, null),clientId, value, this.callback)
+    quotaManager.maybeRecordAndGetThrottleTimeMs(Session(principal, null),clientId, value, time.milliseconds())
+  }
+
+  private def throttle(quotaManager: ClientQuotaManager, user: String, clientId: String, throttleTimeMs: Int,
+                       channelThrottlingCallback: (ResponseAction) => Unit) {
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+    quotaManager.throttle(Session(principal, null),clientId, throttleTimeMs, channelThrottlingCallback)
   }
 
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
@@ -60,7 +70,7 @@ class ClientQuotaManagerTest {
       assertEquals("Should return the overridden value (4000)", 4000, clientMetrics.quota(client2.user, client2.clientId).bound, 0.0)
 
       // p1 should be throttled using the overridden quota
-      var throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 2500 * config.numQuotaSamples)
+      var throttleTimeMs = maybeRecord(clientMetrics, client1.user, client1.clientId, 2500 * config.numQuotaSamples)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
@@ -68,14 +78,14 @@ class ClientQuotaManagerTest {
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true)))
       assertEquals("Should return the newly overridden value (3000)", 3000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
+      throttleTimeMs = maybeRecord(clientMetrics, client1.user, client1.clientId, 0)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true)))
       assertEquals("Should return the default value (500)", 500, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
+      throttleTimeMs = maybeRecord(clientMetrics, client1.user, client1.clientId, 0)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
@@ -83,7 +93,7 @@ class ClientQuotaManagerTest {
       clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true)))
       assertEquals("Should return the newly overridden value (4000)", 4000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 1000 * config.numQuotaSamples)
+      throttleTimeMs = maybeRecord(clientMetrics, client1.user, client1.clientId, 1000 * config.numQuotaSamples)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
     } finally {
@@ -163,7 +173,7 @@ class ClientQuotaManagerTest {
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
       assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
-      val throttleTimeMs = maybeRecordAndThrottle(quotaManager, user, clientId, value * config.numQuotaSamples)
+      val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
       if (expectThrottle)
         assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
       else
@@ -239,10 +249,9 @@ class ClientQuotaManagerTest {
        * if we produce under the quota
        */
       for (_ <- 0 until 10) {
-        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
+        assertEquals(0, maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 400))
         time.sleep(1000)
       }
-      assertEquals(10, numCallbacks)
       assertEquals(0, queueSizeMetric.value().toInt)
 
       // Create a spike.
@@ -250,28 +259,29 @@ class ClientQuotaManagerTest {
       // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
       // 10.5 seconds because the last window is half complete
       time.sleep(500)
-      val sleepTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 2300)
+      val sleepTime = maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 2300)
 
       assertEquals("Should be throttled", 2100, sleepTime)
+      throttle(clientMetrics, "ANONYMOYUS", "unknown", sleepTime, callback)
       assertEquals(1, queueSizeMetric.value().toInt)
       // After a request is delayed, the callback cannot be triggered immediately
-      clientMetrics.throttledRequestReaper.doWork()
-      assertEquals(10, numCallbacks)
+      clientMetrics.throttledChannelReaper.doWork()
+      assertEquals(0, numCallbacks)
       time.sleep(sleepTime)
 
       // Callback can only be triggered after the delay time passes
-      clientMetrics.throttledRequestReaper.doWork()
+      clientMetrics.throttledChannelReaper.doWork()
       assertEquals(0, queueSizeMetric.value().toInt)
-      assertEquals(11, numCallbacks)
+      assertEquals(1, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 10) {
-        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
+        maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 400)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 0))
+                   0, maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 0))
     } finally {
       clientMetrics.shutdown()
     }
@@ -289,10 +299,9 @@ class ClientQuotaManagerTest {
        * if we are under the quota
        */
       for (_ <- 0 until 10) {
-        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
+        assertEquals(0, maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4)))
         time.sleep(1000)
       }
-      assertEquals(10, numCallbacks)
       assertEquals(0, queueSizeMetric.value().toInt)
 
       // Create a spike.
@@ -301,38 +310,40 @@ class ClientQuotaManagerTest {
       // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
       // 10.5 seconds interval because the last window is half complete
       time.sleep(500)
-      val throttleTime = maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(67.1))
+      val throttleTime = maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(67.1))
 
       assertEquals("Should be throttled", 210, throttleTime)
+
+      throttle(quotaManager, "ANONYMOYUS", "test-client", throttleTime, callback)
       assertEquals(1, queueSizeMetric.value().toInt)
       // After a request is delayed, the callback cannot be triggered immediately
-      quotaManager.throttledRequestReaper.doWork()
-      assertEquals(10, numCallbacks)
+      quotaManager.throttledChannelReaper.doWork()
+      assertEquals(0, numCallbacks)
       time.sleep(throttleTime)
 
       // Callback can only be triggered after the delay time passes
-      quotaManager.throttledRequestReaper.doWork()
+      quotaManager.throttledChannelReaper.doWork()
       assertEquals(0, queueSizeMetric.value().toInt)
-      assertEquals(11, numCallbacks)
+      assertEquals(1, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 11) {
-        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
+        maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
+                   0, maybeRecord(quotaManager, "ANONYMOUS", "test-client", 0))
 
       // Create a very large spike which requires > one quota window to bring within quota
-      assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(500)))
+      assertEquals(1000, maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(500)))
       for (_ <- 0 until 10) {
         time.sleep(1000)
-        assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
+        assertEquals(1000, maybeRecord(quotaManager, "ANONYMOUS", "test-client", 0))
       }
       time.sleep(1000)
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
+                   0, maybeRecord(quotaManager, "ANONYMOUS", "test-client", 0))
 
     } finally {
       quotaManager.shutdown()
@@ -344,11 +355,11 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
+      maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove the throttle time sensor
       metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does not exist.
-      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
+      val throttleTime = maybeRecord(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
@@ -363,12 +374,12 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
+      maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove all the sensors
       metrics.removeSensor("ProduceThrottleTime-:client1")
       metrics.removeSensor("Produce-ANONYMOUS:client1")
       // should not throw an exception
-      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
+      val throttleTime = maybeRecord(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated
@@ -388,7 +399,7 @@ class ClientQuotaManagerTest {
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val clientId = "client@#$%"
     try {
-      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", clientId, 100)
+      maybeRecord(clientMetrics, "ANONYMOUS", clientId, 100)
 
       // The metrics should use the raw client ID, even if the reporters internally sanitize them
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + clientId)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 8212100..1d6092a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -121,7 +121,7 @@ class KafkaApisTest {
       val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder("groupId",
         Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
 
-      val capturedResponse = expectThrottleCallbackAndInvoke()
+      val capturedResponse = expectNoThrottling()
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
       createKafkaApis().handleOffsetCommitRequest(request)
 
@@ -147,7 +147,7 @@ class KafkaApisTest {
       val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder("txnlId", "groupId",
         15L, 0.toShort, Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
 
-      val capturedResponse = expectThrottleCallbackAndInvoke()
+      val capturedResponse = expectNoThrottling()
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
       createKafkaApis().handleTxnOffsetCommitRequest(request)
 
@@ -173,7 +173,7 @@ class KafkaApisTest {
       val (addPartitionsToTxnRequest, request) = buildRequest(new AddPartitionsToTxnRequest.Builder(
         "txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava))
 
-      val capturedResponse = expectThrottleCallbackAndInvoke()
+      val capturedResponse = expectNoThrottling()
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
       createKafkaApis().handleAddPartitionToTxnRequest(request)
 
@@ -373,7 +373,7 @@ class KafkaApisTest {
       EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
     EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
     EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset)))
-    val capturedResponse = expectThrottleCallbackAndInvoke()
+    val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -414,7 +414,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
     EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
       .andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset)))
-    val capturedResponse = expectThrottleCallbackAndInvoke()
+    val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -482,7 +482,7 @@ class KafkaApisTest {
   }
 
   private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
-    val capturedResponse = expectThrottleCallbackAndInvoke()
+    val capturedResponse = expectNoThrottling()
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
 
     val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener)
@@ -503,7 +503,8 @@ class KafkaApisTest {
     else
       EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
 
-    val capturedResponse = expectThrottleCallbackAndInvoke()
+    val capturedResponse = expectNoThrottling()
+
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -536,8 +537,8 @@ class KafkaApisTest {
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
       listenerName, SecurityProtocol.PLAINTEXT)
-    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0,
-      MemoryPool.NONE, buffer, requestChannelMetrics))
+    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
+      requestChannelMetrics))
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
@@ -551,17 +552,11 @@ class KafkaApisTest {
     AbstractResponse.parseResponse(api, struct)
   }
 
-  private def expectThrottleCallbackAndInvoke(): Capture[RequestChannel.Response] = {
-    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
-    EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
-      EasyMock.anyObject[RequestChannel.Request](),
-      EasyMock.capture(capturedThrottleCallback)))
-      .andAnswer(new IAnswer[Unit] {
-        override def answer(): Unit = {
-          val callback = capturedThrottleCallback.getValue
-          callback(0)
-        }
-      })
+  private def expectNoThrottling(): Capture[RequestChannel.Response] = {
+    EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request]()))
+      .andReturn(0)
+    EasyMock.expect(clientRequestQuotaManager.throttle(EasyMock.anyObject[RequestChannel.Request](), EasyMock.eq(0),
+      EasyMock.anyObject[RequestChannel.ResponseAction => Unit]()))
 
     val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
     EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
@@ -578,5 +573,4 @@ class KafkaApisTest {
       0, partitions.asJava, Set(broker).asJava).build()
     metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 877b5c3..e62c9fd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -52,6 +52,8 @@ class RequestQuotaTest extends BaseRequestTest {
   private val tp = new TopicPartition(topic, 0)
   private val logDir = "logDir"
   private val unthrottledClientId = "unthrottled-client"
+  private val smallQuotaProducerClientId = "small-quota-producer-client"
+  private val smallQuotaConsumerClientId = "small-quota-consumer-client"
   private val brokerId: Integer = 0
   private var leaderNode: KafkaServer = null
 
@@ -81,10 +83,22 @@ class RequestQuotaTest extends BaseRequestTest {
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
+    quotaProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "2000")
+    quotaProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
     adminZkClient.changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
     adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
+    // Client ids with small producer and consumer (fetch) quotas. Quota values were picked so that both
+    // producer/consumer and request quotas are violated on the first produce/consume operation, and the delay due to
+    // producer/consumer quota violation will be longer than the delay due to request quota violation.
+    quotaProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1")
+    quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
+    quotaProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "1")
+    quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
+
     TestUtils.retry(10000) {
       val quotaManager = servers.head.apis.quotas.request
       assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
@@ -107,6 +121,20 @@ class RequestQuotaTest extends BaseRequestTest {
   }
 
   @Test
+  def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated() {
+    val apiKey = ApiKeys.PRODUCE;
+    submitTest(apiKey, () => checkSmallQuotaProducerRequestThrottleTime(apiKey))
+    waitAndCheckResults()
+  }
+
+  @Test
+  def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated() {
+    val apiKey = ApiKeys.FETCH;
+    submitTest(apiKey, () => checkSmallQuotaConsumerRequestThrottleTime(apiKey))
+    waitAndCheckResults()
+  }
+
+  @Test
   def testUnthrottledClient() {
     for (apiKey <- RequestQuotaTest.ClientActions)
       submitTest(apiKey, () => checkUnthrottledClient(apiKey))
@@ -135,8 +163,12 @@ class RequestQuotaTest extends BaseRequestTest {
   def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
 
   private def throttleTimeMetricValue(clientId: String): Double = {
+    throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)
+  }
+
+  private def throttleTimeMetricValueForQuotaType(clientId: String, quotaType: QuotaType): Double = {
     val metricName = leaderNode.metrics.metricName("throttle-time",
-                                  QuotaType.Request.toString,
+                                  quotaType.toString,
                                   "",
                                   "user", "",
                                   "client-id", clientId)
@@ -380,7 +412,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
   private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
     apiKey match {
-      case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime
+      case ApiKeys.PRODUCE => new ProduceResponse(response).throttleTimeMs
       case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs
       case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
       case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
@@ -430,6 +462,32 @@ class RequestQuotaTest extends BaseRequestTest {
     assertTrue(s"Throttle time metrics not updated: $client" , throttleTimeMetricValue(clientId) > 0)
   }
 
+  private def checkSmallQuotaProducerRequestThrottleTime(apiKey: ApiKeys) {
+
+    // Request until throttled using client-id with default small producer quota
+    val smallQuotaProducerClient = Client(smallQuotaProducerClientId, apiKey)
+    val throttled = smallQuotaProducerClient.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+    assertTrue(s"Response not throttled: $smallQuotaProducerClient", throttled)
+    assertTrue(s"Throttle time metrics for produce quota not updated: $smallQuotaProducerClient",
+      throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, QuotaType.Produce) > 0)
+    assertTrue(s"Throttle time metrics for request quota updated: $smallQuotaProducerClient",
+      throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, QuotaType.Request) == 0)
+  }
+
+  private def checkSmallQuotaConsumerRequestThrottleTime(apiKey: ApiKeys) {
+
+    // Request until throttled using client-id with default small consumer quota
+    val smallQuotaConsumerClient =   Client(smallQuotaConsumerClientId, apiKey)
+    val throttled = smallQuotaConsumerClient.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+    assertTrue(s"Response not throttled: $smallQuotaConsumerClientId", throttled)
+    assertTrue(s"Throttle time metrics for consumer quota not updated: $smallQuotaConsumerClientId",
+      throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.Fetch) > 0)
+    assertTrue(s"Throttle time metrics for request quota updated: $smallQuotaConsumerClient",
+      throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.Request) == 0)
+  }
+
   private def checkUnthrottledClient(apiKey: ApiKeys) {
 
     // Test that request from client with large quota is not throttled
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
similarity index 58%
rename from core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
rename to core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index d7634c0..8ba584c 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -21,60 +21,70 @@ package kafka.server
 import java.util.Collections
 import java.util.concurrent.{DelayQueue, TimeUnit}
 
+import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.common.utils.MockTime
 import org.junit.{Assert, Before, Test}
 
-class ThrottledResponseExpirationTest {
+class ThrottledChannelExpirationTest {
   private val time = new MockTime
-  private var numCallbacks: Int = 0
+  private var numCallbacksForStartThrottling: Int = 0
+  private var numCallbacksForEndThrottling: Int = 0
   private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
                                                                     Collections.emptyList(),
                                                                     time)
 
-  def callback(delayTimeMs: Int) {
-    numCallbacks += 1
+  def callback(responseAction: ResponseAction): Unit = {
+    responseAction match {
+      case StartThrottlingAction => numCallbacksForStartThrottling += 1
+      case EndThrottlingAction => numCallbacksForEndThrottling += 1
+    }
   }
 
   @Before
   def beforeMethod() {
-    numCallbacks = 0
+    numCallbacksForStartThrottling = 0
+    numCallbacksForEndThrottling = 0
   }
 
   @Test
-  def testExpire() {
+  def testCallbackInvocationAfterExpiration() {
     val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time, "")
 
-    val delayQueue = new DelayQueue[ThrottledResponse]()
-    val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue, "")
+    val delayQueue = new DelayQueue[ThrottledChannel]()
+    val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
     try {
-      // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
-      delayQueue.add(new ThrottledResponse(time, 10, callback))
-      delayQueue.add(new ThrottledResponse(time, 30, callback))
-      delayQueue.add(new ThrottledResponse(time, 30, callback))
-      delayQueue.add(new ThrottledResponse(time, 20, callback))
+      // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp.
+      val channel1 = new ThrottledChannel(time, 10, callback)
+      val channel2 = new ThrottledChannel(time, 30, callback)
+      val channel3 = new ThrottledChannel(time, 30, callback)
+      val channel4 = new ThrottledChannel(time, 20, callback)
+      delayQueue.add(channel1)
+      delayQueue.add(channel2)
+      delayQueue.add(channel3)
+      delayQueue.add(channel4)
+      Assert.assertEquals(4, numCallbacksForStartThrottling)
 
       for(itr <- 1 to 3) {
         time.sleep(10)
         reaper.doWork()
-        Assert.assertEquals(itr, numCallbacks)
-
+        Assert.assertEquals(itr, numCallbacksForEndThrottling)
       }
       reaper.doWork()
-      Assert.assertEquals(4, numCallbacks)
+      Assert.assertEquals(4, numCallbacksForEndThrottling)
       Assert.assertEquals(0, delayQueue.size())
       reaper.doWork()
-      Assert.assertEquals(4, numCallbacks)
+      Assert.assertEquals(4, numCallbacksForEndThrottling)
     } finally {
       clientMetrics.shutdown()
     }
   }
 
   @Test
-  def testThrottledRequest() {
-    val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
-    val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
-    val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
+  def testThrottledChannelDelay() {
+    val t1: ThrottledChannel = new ThrottledChannel(time, 10, callback)
+    val t2: ThrottledChannel = new ThrottledChannel(time, 20, callback)
+    val t3: ThrottledChannel = new ThrottledChannel(time, 20, callback)
     Assert.assertEquals(10, t1.throttleTimeMs)
     Assert.assertEquals(20, t2.throttleTimeMs)
     Assert.assertEquals(20, t3.throttleTimeMs)
@@ -86,4 +96,4 @@ class ThrottledResponseExpirationTest {
       time.sleep(10)
     }
   }
-}
+}
\ No newline at end of file
diff --git a/docs/upgrade.html b/docs/upgrade.html
index da28d3b..0954a25 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -96,6 +96,7 @@
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
 <ul>
     <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over">KIP-279</a>: OffsetsForLeaderEpochResponse v1 introduces a partition-level <code>leader_epoch</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+communication">KIP-219</a>: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.</li>
 </ul>
 
 

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.