You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/13 04:02:00 UTC

[kafka] branch 3.3 updated: KAFKA-14215; Ensure forwarded requests are applied to broker request quota (#12624)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 85fc2671001 KAFKA-14215; Ensure forwarded requests are applied to broker request quota (#12624)
85fc2671001 is described below

commit 85fc267100161441342eeaa0a8fcb8625b300148
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Sep 12 20:50:33 2022 -0700

    KAFKA-14215; Ensure forwarded requests are applied to broker request quota (#12624)
    
    Currently forwarded requests are not applied to any quotas on either the controller or the broker. The controller-side throttling requires the controller to apply the quota changes from the log to the quota managers, which will be done separately. In this patch, we change the response logic on the broker side to also apply the broker's request quota. The enforced throttle time is the maximum of the throttle returned from the controller (which is 0 until we fix the aforementioned issue [...]
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/common/requests/AbstractResponse.java    | 12 +++++
 .../common/requests/AddOffsetsToTxnResponse.java   |  5 ++
 .../requests/AddPartitionsToTxnResponse.java       |  5 ++
 .../requests/AllocateProducerIdsResponse.java      |  5 ++
 .../common/requests/AlterClientQuotasResponse.java |  5 ++
 .../common/requests/AlterConfigsResponse.java      |  5 ++
 .../AlterPartitionReassignmentsResponse.java       |  5 ++
 .../common/requests/AlterPartitionResponse.java    |  5 ++
 .../requests/AlterReplicaLogDirsResponse.java      |  5 ++
 .../AlterUserScramCredentialsResponse.java         |  5 ++
 .../kafka/common/requests/ApiVersionsResponse.java |  5 ++
 .../common/requests/BeginQuorumEpochResponse.java  |  5 ++
 .../common/requests/BrokerHeartbeatResponse.java   |  5 ++
 .../requests/BrokerRegistrationResponse.java       |  5 ++
 .../requests/ControlledShutdownResponse.java       |  5 ++
 .../kafka/common/requests/CreateAclsResponse.java  |  5 ++
 .../requests/CreateDelegationTokenResponse.java    |  5 ++
 .../common/requests/CreatePartitionsResponse.java  |  5 ++
 .../common/requests/CreateTopicsResponse.java      |  5 ++
 .../kafka/common/requests/DeleteAclsResponse.java  |  5 ++
 .../common/requests/DeleteGroupsResponse.java      |  5 ++
 .../common/requests/DeleteRecordsResponse.java     |  5 ++
 .../common/requests/DeleteTopicsResponse.java      |  5 ++
 .../common/requests/DescribeAclsResponse.java      |  5 ++
 .../requests/DescribeClientQuotasResponse.java     |  5 ++
 .../common/requests/DescribeClusterResponse.java   |  5 ++
 .../common/requests/DescribeConfigsResponse.java   |  5 ++
 .../requests/DescribeDelegationTokenResponse.java  |  5 ++
 .../common/requests/DescribeGroupsResponse.java    |  5 ++
 .../common/requests/DescribeLogDirsResponse.java   |  5 ++
 .../common/requests/DescribeProducersResponse.java |  5 ++
 .../common/requests/DescribeQuorumResponse.java    |  5 ++
 .../requests/DescribeTransactionsResponse.java     |  6 +++
 .../DescribeUserScramCredentialsResponse.java      |  5 ++
 .../common/requests/ElectLeadersResponse.java      |  5 ++
 .../common/requests/EndQuorumEpochResponse.java    |  5 ++
 .../kafka/common/requests/EndTxnResponse.java      |  4 ++
 .../kafka/common/requests/EnvelopeResponse.java    |  5 ++
 .../requests/ExpireDelegationTokenResponse.java    |  5 ++
 .../kafka/common/requests/FetchResponse.java       |  5 ++
 .../common/requests/FetchSnapshotResponse.java     |  5 ++
 .../common/requests/FindCoordinatorResponse.java   |  5 ++
 .../kafka/common/requests/HeartbeatResponse.java   |  5 ++
 .../requests/IncrementalAlterConfigsResponse.java  |  5 ++
 .../common/requests/InitProducerIdResponse.java    |  5 ++
 .../kafka/common/requests/JoinGroupResponse.java   |  5 ++
 .../common/requests/LeaderAndIsrResponse.java      |  5 ++
 .../kafka/common/requests/LeaveGroupResponse.java  |  5 ++
 .../kafka/common/requests/ListGroupsResponse.java  |  5 ++
 .../kafka/common/requests/ListOffsetsResponse.java |  5 ++
 .../ListPartitionReassignmentsResponse.java        |  5 ++
 .../common/requests/ListTransactionsResponse.java  |  5 ++
 .../kafka/common/requests/MetadataResponse.java    |  5 ++
 .../common/requests/OffsetCommitResponse.java      |  5 ++
 .../common/requests/OffsetDeleteResponse.java      |  5 ++
 .../kafka/common/requests/OffsetFetchResponse.java |  5 ++
 .../requests/OffsetsForLeaderEpochResponse.java    |  5 ++
 .../kafka/common/requests/ProduceResponse.java     |  5 ++
 .../requests/RenewDelegationTokenResponse.java     |  5 ++
 .../common/requests/SaslAuthenticateResponse.java  |  5 ++
 .../common/requests/SaslHandshakeResponse.java     |  5 ++
 .../kafka/common/requests/StopReplicaResponse.java |  5 ++
 .../kafka/common/requests/SyncGroupResponse.java   |  5 ++
 .../common/requests/TxnOffsetCommitResponse.java   |  5 ++
 .../common/requests/UnregisterBrokerResponse.java  |  5 ++
 .../common/requests/UpdateFeaturesResponse.java    |  5 ++
 .../common/requests/UpdateMetadataResponse.java    |  5 ++
 .../apache/kafka/common/requests/VoteResponse.java |  5 ++
 .../common/requests/WriteTxnMarkersResponse.java   |  5 ++
 .../scala/kafka/server/RequestHandlerHelper.scala  | 11 +++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 55 +++++++++++++++++++++-
 71 files changed, 412 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index cd99f472ebb..7e4425d3e79 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -266,8 +266,20 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
         return apiKey;
     }
 
+    /**
+     * Get the throttle time in milliseconds. If the response schema does not
+     * support this field, then 0 will be returned.
+     */
     public abstract int throttleTimeMs();
 
+    /**
+     * Set the throttle time in the response if the schema supports it. Otherwise,
+     * this is a no-op.
+     *
+     * @param throttleTimeMs The throttle time in milliseconds
+     */
+    public abstract void maybeSetThrottleTimeMs(int throttleTimeMs);
+
     public String toString() {
         return data().toString();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index ce9a6cf7d60..d90afd04ddc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -56,6 +56,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public AddOffsetsToTxnResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 57b2a5a5d7c..8038f4b8fc6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -94,6 +94,11 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Map<TopicPartition, Errors> errors() {
         if (cachedErrorsMap != null) {
             return cachedErrorsMap;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 41db29158e5..2511e2b2db3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -56,6 +56,11 @@ public class AllocateProducerIdsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
index fcacc5d95ef..fc56db7e736 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
@@ -67,6 +67,11 @@ public class AlterClientQuotasResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index 1115f06ee80..1668c2446bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -55,6 +55,11 @@ public class AlterConfigsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public AlterConfigsResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
index ab166b81271..7d6c340fd14 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
@@ -55,6 +55,11 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java
index d2ace4112f4..9ee92f7b809 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java
@@ -55,6 +55,11 @@ public class AlterPartitionResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public static AlterPartitionResponse parse(ByteBuffer buffer, short version) {
         return new AlterPartitionResponse(new AlterPartitionResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
index afa658d1e15..0c38a83ee3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
@@ -53,6 +53,11 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java
index 97c0b7d17b2..86c9b006a2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java
@@ -48,6 +48,11 @@ public class AlterUserScramCredentialsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(data.results().stream().map(r -> Errors.forCode(r.errorCode())));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 28c9d613bbb..a903e50b15d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -74,6 +74,11 @@ public class ApiVersionsResponse extends AbstractResponse {
         return this.data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 2;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java
index c8c0328c93a..5ae975acd8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java
@@ -95,6 +95,11 @@ public class BeginQuorumEpochResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static BeginQuorumEpochResponse parse(ByteBuffer buffer, short version) {
         return new BeginQuorumEpochResponse(new BeginQuorumEpochResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java
index e7d01e53c67..4c8b3aafc4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java
@@ -44,6 +44,11 @@ public class BrokerHeartbeatResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java
index 8296d7a4c35..8b6121c3763 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java
@@ -44,6 +44,11 @@ public class BrokerRegistrationResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 73b6a502683..bc5aa0ba35a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -58,6 +58,11 @@ public class ControlledShutdownResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static ControlledShutdownResponse parse(ByteBuffer buffer, short version) {
         return new ControlledShutdownResponse(new ControlledShutdownResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 8bc6643f9de..cef7b73ac27 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -43,6 +43,11 @@ public class CreateAclsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public List<CreateAclsResponseData.AclCreationResult> results() {
         return data.results();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index 22c2e125901..0a9f9a8991b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -86,6 +86,11 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
index e59ac981f11..2dcd2b200ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
@@ -62,4 +62,9 @@ public class CreatePartitionsResponse extends AbstractResponse {
     public int throttleTimeMs() {
         return data.throttleTimeMs();
     }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index dd062774258..da011e224ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -60,6 +60,11 @@ public class CreateTopicsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         HashMap<Errors, Integer> counts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 7482953a00d..d0b596ed91a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -60,6 +60,11 @@ public class DeleteAclsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults() {
         return data.filterResults();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
index 4cbffda4221..3bbb08d59fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
@@ -85,6 +85,11 @@ public class DeleteGroupsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index b090543fadd..5084681f537 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -56,6 +56,11 @@ public class DeleteRecordsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 2090c4fd2e2..65a54481ba0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -50,6 +50,11 @@ public class DeleteTopicsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public DeleteTopicsResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index c4190e65640..c602dab9951 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -71,6 +71,11 @@ public class DescribeAclsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public ApiError error() {
         return new ApiError(Errors.forCode(data.errorCode()), data.errorMessage());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
index 147414333ea..3a052c9fe8e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
@@ -70,6 +70,11 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public DescribeClientQuotasResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
index 60d931196a6..fb48476e246 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
@@ -52,6 +52,11 @@ public class DescribeClusterResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public DescribeClusterResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index aa7a713e8ab..6fd5320d882 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -255,6 +255,11 @@ public class DescribeConfigsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index 4fd1d996526..a922f056a89 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -96,6 +96,11 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 360caf01e46..119bedfdb19 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -115,6 +115,11 @@ public class DescribeGroupsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public static final String UNKNOWN_STATE = "";
     public static final String UNKNOWN_PROTOCOL_TYPE = "";
     public static final String UNKNOWN_PROTOCOL = "";
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index fe8aebbc4f6..cbf30542173 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -50,6 +50,11 @@ public class DescribeLogDirsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java
index 74e9437472b..065a101bed6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java
@@ -66,4 +66,9 @@ public class DescribeProducersResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index 9f58e52970c..39e050c9405 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -70,6 +70,11 @@ public class DescribeQuorumResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static DescribeQuorumResponseData singletonErrorResponse(
         TopicPartition topicPartition,
         Errors error
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java
index cf151b35bba..5eef63b0ce4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java
@@ -64,4 +64,10 @@ public class DescribeTransactionsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
 }
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java
index 001cefae41a..58ba4212949 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java
@@ -48,6 +48,11 @@ public class DescribeUserScramCredentialsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(data.results().stream().map(r -> Errors.forCode(r.errorCode())));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
index 88d4d19fc02..2e82cd4c9a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
@@ -62,6 +62,11 @@ public class ElectLeadersResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         HashMap<Errors, Integer> counts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java
index ac2c0c5c9d5..b3a236adc69 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java
@@ -73,6 +73,11 @@ public class EndQuorumEpochResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static EndQuorumEpochResponseData singletonResponse(
         Errors topLevelError,
         TopicPartition topicPartition,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 029e7d0ce59..0ab01bb1a3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -50,6 +50,10 @@ public class EndTxnResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
 
     public Errors error() {
         return Errors.forCode(data.errorCode());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
index 529f616bb26..4f534b6721f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
@@ -67,6 +67,11 @@ public class EnvelopeResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static EnvelopeResponse parse(ByteBuffer buffer, short version) {
         return new EnvelopeResponse(new EnvelopeResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index 163ee78d0ad..ec43f3371b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -56,6 +56,11 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
         return data;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public int throttleTimeMs() {
         return data.throttleTimeMs();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index a4af4ca2a23..cd177945830 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -128,6 +128,11 @@ public class FetchResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public int sessionId() {
         return data.sessionId();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
index 7c1ce27f3da..d9abff66fe9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
@@ -61,6 +61,11 @@ final public class FetchSnapshotResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public FetchSnapshotResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 080ba24c3bd..e96e8a0c0db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -64,6 +64,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public boolean hasError() {
         return error() != Errors.NONE;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index eb402fcbab9..aebb903e967 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -48,6 +48,11 @@ public class HeartbeatResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
index b5887de9b4b..826be30a8d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
@@ -90,6 +90,11 @@ public class IncrementalAlterConfigsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public static IncrementalAlterConfigsResponse parse(ByteBuffer buffer, short version) {
         return new IncrementalAlterConfigsResponse(new IncrementalAlterConfigsResponseData(
             new ByteBufferAccessor(buffer), version));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index f8451d7863b..96c7a4d400c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -48,6 +48,11 @@ public class InitProducerIdResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(Errors.forCode(data.errorCode()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 336c82462a2..5a4332efde9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -47,6 +47,11 @@ public class JoinGroupResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index c7c04e2d99b..0d40581d68e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -99,6 +99,11 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
         return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version), version);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 9a59139f4e7..d39766be68f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -82,6 +82,11 @@ public class LeaveGroupResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public List<MemberResponse> memberResponses() {
         return data.members();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 270c43c0568..a12f85341d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -43,6 +43,11 @@ public class ListGroupsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(Errors.forCode(data.errorCode()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
index 8c4a51b542b..53356dd93be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
@@ -64,6 +64,11 @@ public class ListOffsetsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public ListOffsetsResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
index 4a890e8b50c..cbf06d4c466 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
@@ -53,6 +53,11 @@ public class ListPartitionReassignmentsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(Errors.forCode(data.errorCode()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java
index 13ed184fc34..f509543025b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java
@@ -59,4 +59,9 @@ public class ListTransactionsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 3696b047aba..47cdd3f0d7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -84,6 +84,11 @@ public class MetadataResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     /**
      * Get a map of the topics which had metadata errors
      * @return the map
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 2ed0e312983..713b68974a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -107,6 +107,11 @@ public class OffsetCommitResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
index 79f6f4e6d34..993a589af69 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
@@ -77,6 +77,11 @@ public class OffsetDeleteResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 4e25984668d..2d585a582ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -245,6 +245,11 @@ public class OffsetFetchResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public boolean hasError() {
         return error != Errors.NONE;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 893d5a2af20..10c257c0a37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -68,6 +68,11 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short version) {
         return new OffsetsForLeaderEpochResponse(new OffsetForLeaderEpochResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 7c9d70b8d67..a00fdecc586 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -116,6 +116,11 @@ public class ProduceResponse extends AbstractResponse {
         return this.data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 30708ff038c..8ea85d74db6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -53,6 +53,11 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index bd12d3d4ae7..d6ca8c170dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -67,6 +67,11 @@ public class SaslAuthenticateResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     @Override
     public SaslAuthenticateResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index 63c047a0619..5097711e737 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -57,6 +57,11 @@ public class SaslHandshakeResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     @Override
     public SaslHandshakeResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 10ab153f440..cb66f4915d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -70,6 +70,11 @@ public class StopReplicaResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     @Override
     public StopReplicaResponseData data() {
         return data;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 822a3e78b99..59611024290 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -38,6 +38,11 @@ public class SyncGroupResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index b4de54741e6..18244fcb17c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -87,6 +87,11 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         return errorCounts(data.topics().stream().flatMap(topic ->
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
index b508ac3ef9e..623e6f28076 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
@@ -44,6 +44,11 @@ public class UnregisterBrokerResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
index 26825a0c247..567464d85db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
@@ -63,6 +63,11 @@ public class UpdateFeaturesResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
     @Override
     public String toString() {
         return data.toString();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index cc7749a4724..d5960d7cbb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -47,6 +47,11 @@ public class UpdateMetadataResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
         return new UpdateMetadataResponse(new UpdateMetadataResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java
index 51991adcf0c..f79c6eeb0de 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java
@@ -92,6 +92,11 @@ public class VoteResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     public static VoteResponse parse(ByteBuffer buffer, short version) {
         return new VoteResponse(new VoteResponseData(new ByteBufferAccessor(buffer), version));
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index fd2a834d245..a7d22e4493e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -108,6 +108,11 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        // Not supported by the response schema
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index a1aab617b3a..5db595986ef 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -95,10 +95,13 @@ class RequestHandlerHelper(
 
   def sendForwardedResponse(request: RequestChannel.Request,
                             response: AbstractResponse): Unit = {
-    // For forwarded requests, we take the throttle time from the broker that
-    // the request was forwarded to
-    val throttleTimeMs = response.throttleTimeMs()
-    throttle(quotas.request, request, throttleTimeMs)
+    // For requests forwarded to the controller, we take the maximum of the local
+    // request throttle and the throttle sent by the controller in the response.
+    val controllerThrottleTimeMs = response.throttleTimeMs()
+    val requestThrottleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    val appliedThrottleTimeMs = math.max(controllerThrottleTimeMs, requestThrottleTimeMs)
+    throttle(quotas.request, request, appliedThrottleTimeMs)
+    response.maybeSetThrottleTimeMs(appliedThrottleTimeMs)
     requestChannel.sendResponse(request, response, None)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index aefb1a5348d..d0e5687f67a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,7 +23,6 @@ import java.util
 import java.util.Arrays.asList
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Optional, Properties, Random}
-
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.{ControllerContext, KafkaController}
@@ -84,7 +83,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
 import org.mockito.Mockito.{mock, reset, times, verify, when}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
@@ -92,6 +91,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
 
@@ -772,6 +772,57 @@ class KafkaApisTest {
     testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder)
   }
 
+  @ParameterizedTest
+  @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
+  def testKRaftControllerThrottleTimeEnforced(
+    controllerThrottleTimeMs: Int,
+    requestThrottleTimeMs: Int
+  ): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+
+    val topicToCreate = new CreatableTopic()
+      .setName("topic")
+      .setNumPartitions(1)
+      .setReplicationFactor(1.toShort)
+
+    val requestData = new CreateTopicsRequestData()
+    requestData.topics().add(topicToCreate)
+
+    val requestBuilder = new CreateTopicsRequest.Builder(requestData).build()
+    val request = buildRequest(requestBuilder)
+
+    val kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
+    val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
+
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(request, time.milliseconds()))
+      .thenReturn(requestThrottleTimeMs)
+
+    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+
+    verify(forwardingManager).forwardRequest(
+      ArgumentMatchers.eq(request),
+      forwardCallback.capture()
+    )
+
+    val responseData = new CreateTopicsResponseData()
+      .setThrottleTimeMs(controllerThrottleTimeMs)
+    responseData.topics().add(new CreatableTopicResult()
+      .setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code))
+
+    forwardCallback.getValue.apply(Some(new CreateTopicsResponse(responseData)))
+
+    val expectedThrottleTimeMs = math.max(controllerThrottleTimeMs, requestThrottleTimeMs)
+
+    verify(clientRequestQuotaManager).throttle(
+      ArgumentMatchers.eq(request),
+      any[ThrottleCallback](),
+      ArgumentMatchers.eq(expectedThrottleTimeMs)
+    )
+
+    assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs)
+  }
+
   @Test
   def testCreatePartitionsAuthorization(): Unit = {
     val authorizer: Authorizer = mock(classOf[Authorizer])