You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/19 00:52:35 UTC

[kafka] branch 2.8 updated (2e16a76 -> e957886)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 2e16a76  TRIVIAL: fix JavaDocs formatting (#10134)
     new 4af420a  Fixed README and added clearer error message. (#10133)
     new e957886  KAFKA-12278; Ensure exposed api versions are consistent within listener (#10666)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/clients/NodeApiVersions.java  |  10 +-
 .../kafka/common/network/ChannelBuilders.java      |  16 +-
 .../kafka/common/network/SaslChannelBuilder.java   |  12 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  71 ++++++---
 .../org/apache/kafka/common/protocol/Protocol.java |   2 +-
 .../kafka/common/requests/ApiVersionsResponse.java |  89 ++++++++---
 .../authenticator/SaslServerAuthenticator.java     |  13 +-
 .../common/message/AddOffsetsToTxnRequest.json     |   1 +
 .../common/message/AddPartitionsToTxnRequest.json  |   1 +
 .../common/message/AlterClientQuotasRequest.json   |   1 +
 .../common/message/AlterConfigsRequest.json        |   1 +
 .../resources/common/message/AlterIsrRequest.json  |   1 +
 .../AlterPartitionReassignmentsRequest.json        |   1 +
 .../common/message/AlterReplicaLogDirsRequest.json |   1 +
 .../message/AlterUserScramCredentialsRequest.json  |   1 +
 .../common/message/ApiVersionsRequest.json         |   1 +
 .../common/message/BeginQuorumEpochRequest.json    |   1 +
 .../common/message/BrokerHeartbeatRequest.json     |   1 +
 .../common/message/BrokerRegistrationRequest.json  |   1 +
 .../common/message/ControlledShutdownRequest.json  |   1 +
 .../common/message/CreateAclsRequest.json          |   1 +
 .../message/CreateDelegationTokenRequest.json      |   1 +
 .../common/message/CreatePartitionsRequest.json    |   1 +
 .../common/message/CreateTopicsRequest.json        |   1 +
 .../common/message/DeleteAclsRequest.json          |   1 +
 .../common/message/DeleteGroupsRequest.json        |   1 +
 .../common/message/DeleteRecordsRequest.json       |   1 +
 .../common/message/DeleteTopicsRequest.json        |   1 +
 .../common/message/DescribeAclsRequest.json        |   1 +
 .../message/DescribeClientQuotasRequest.json       |   1 +
 .../common/message/DescribeClusterRequest.json     |   1 +
 .../common/message/DescribeConfigsRequest.json     |   1 +
 .../message/DescribeDelegationTokenRequest.json    |   1 +
 .../common/message/DescribeGroupsRequest.json      |   1 +
 .../common/message/DescribeLogDirsRequest.json     |   1 +
 .../common/message/DescribeProducersRequest.json   |   1 +
 .../common/message/DescribeQuorumRequest.json      |   1 +
 .../DescribeUserScramCredentialsRequest.json       |   1 +
 .../common/message/ElectLeadersRequest.json        |   1 +
 .../common/message/EndQuorumEpochRequest.json      |   1 +
 .../resources/common/message/EndTxnRequest.json    |   1 +
 .../resources/common/message/EnvelopeRequest.json  |   1 +
 .../message/ExpireDelegationTokenRequest.json      |   1 +
 .../resources/common/message/FetchRequest.json     |   1 +
 .../common/message/FetchSnapshotRequest.json       |   1 +
 .../common/message/FindCoordinatorRequest.json     |   1 +
 .../resources/common/message/HeartbeatRequest.json |   1 +
 .../message/IncrementalAlterConfigsRequest.json    |   1 +
 .../common/message/InitProducerIdRequest.json      |   1 +
 .../resources/common/message/JoinGroupRequest.json |   1 +
 .../common/message/LeaderAndIsrRequest.json        |   1 +
 .../common/message/LeaveGroupRequest.json          |   1 +
 .../common/message/ListGroupsRequest.json          |   1 +
 .../common/message/ListOffsetsRequest.json         |   1 +
 .../message/ListPartitionReassignmentsRequest.json |   1 +
 .../resources/common/message/MetadataRequest.json  |   1 +
 .../common/message/OffsetCommitRequest.json        |   1 +
 .../common/message/OffsetDeleteRequest.json        |   1 +
 .../common/message/OffsetFetchRequest.json         |   1 +
 .../message/OffsetForLeaderEpochRequest.json       |   1 +
 .../resources/common/message/ProduceRequest.json   |   1 +
 .../message/RenewDelegationTokenRequest.json       |   1 +
 .../common/message/SaslAuthenticateRequest.json    |   1 +
 .../common/message/SaslHandshakeRequest.json       |   1 +
 .../common/message/StopReplicaRequest.json         |   1 +
 .../resources/common/message/SyncGroupRequest.json |   1 +
 .../common/message/TxnOffsetCommitRequest.json     |   1 +
 .../common/message/UnregisterBrokerRequest.json    |   1 +
 .../common/message/UpdateFeaturesRequest.json      |   1 +
 .../common/message/UpdateMetadataRequest.json      |   1 +
 .../main/resources/common/message/VoteRequest.json |   1 +
 .../common/message/WriteTxnMarkersRequest.json     |   1 +
 .../apache/kafka/clients/NetworkClientTest.java    |  17 +-
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  36 +++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  14 +-
 .../clients/consumer/internals/FetcherTest.java    |   6 +-
 .../clients/producer/internals/SenderTest.java     |  27 ++--
 .../apache/kafka/common/network/NioEchoServer.java |   5 +-
 .../common/network/SaslChannelBuilderTest.java     |  14 +-
 .../common/network/SslTransportLayerTest.java      |  24 ++-
 .../apache/kafka/common/protocol/ApiKeysTest.java  |  22 ++-
 .../common/requests/ApiVersionsResponseTest.java   |  51 +++---
 .../kafka/common/requests/RequestResponseTest.java | 100 ++++++------
 .../authenticator/SaslAuthenticatorTest.java       | 177 ++++++++++-----------
 .../authenticator/SaslServerAuthenticatorTest.java |   8 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  71 ++++-----
 .../main/scala/kafka/network/RequestChannel.scala  |  17 +-
 .../main/scala/kafka/network/SocketServer.scala    |  36 +++--
 .../scala/kafka/server/ApiVersionManager.scala     | 126 +++++++++++++++
 .../src/main/scala/kafka/server/BrokerServer.scala |  51 ++++--
 .../main/scala/kafka/server/ControllerServer.scala |   8 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  58 ++-----
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  15 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  40 +++--
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   8 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |  13 +-
 .../kafka/admin/BrokerApiVersionsCommandTest.scala |   2 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |   5 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  23 +--
 .../unit/kafka/network/SocketServerTest.scala      |  61 ++++---
 .../server/AbstractApiVersionsRequestTest.scala    |  15 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  | 115 +++++++++++++
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   8 +-
 .../unit/kafka/server/ForwardingManagerTest.scala  |   3 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  85 ++--------
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   6 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |   4 +-
 .../kafka/message/ApiMessageTypeGenerator.java     |  69 +++++++-
 .../org/apache/kafka/message/MessageGenerator.java |   6 +-
 .../java/org/apache/kafka/message/MessageSpec.java |  16 +-
 ...auseGenerator.java => RequestListenerType.java} |  17 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   8 +-
 raft/README.md                                     |   2 +-
 114 files changed, 1077 insertions(+), 592 deletions(-)
 create mode 100644 core/src/main/scala/kafka/server/ApiVersionManager.scala
 create mode 100644 core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
 copy generator/src/main/java/org/apache/kafka/message/{ClauseGenerator.java => RequestListenerType.java} (78%)


[kafka] 02/02: KAFKA-12278; Ensure exposed api versions are consistent within listener (#10666)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e957886f2e18ef8c26813018c14b5ea44467965f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 4 10:04:17 2021 -0800

    KAFKA-12278; Ensure exposed api versions are consistent within listener (#10666)
    
    Previously all APIs were accessible on every listener exposed by the broker, but
    with KIP-500, that is no longer true.  We now have more complex requirements for
    API accessibility.
    
    For example, the KIP-500 controller exposes some APIs which are not exposed by
    brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
    such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
    some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
    UpdateFeaturesRequest.
    
    All of this means that we need more sophistication in how we expose APIs and
    keep them consistent with the ApiVersions API. Up until now, we have been
    working around this using the controllerOnly flag inside ApiKeys, but this is
    not rich enough to support all of the cases listed above.  This PR introduces a
    new "listeners" field to the request schema definitions.  This field is an array
    of strings which indicate the listener types in which the API should be exposed.
    We currently support "zkBroker", "broker", and "controller".  ("broker"
    indicates the KIP-500 broker, whereas zkBroker indicates the old broker).
    
    This PR also creates ApiVersionManager to encapsulate the creation of the
    ApiVersionsResponse based on the listener type.  Additionally, it modifies
    SocketServer to check the listener type of received requests before forwarding
    them to the request handler.
    
    Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
    prior to authentication. Previously a static response was sent, which means that
    changes to features would not get reflected. This also meant that the logic to
    ensure that only the intersection of version ranges supported by the controller
    would get exposed did not work. I think this is important because some clients
    rely on the initial pre-authenticated ApiVersions response rather than doing a
    second round after authentication as the Java client does.
    
    One final cleanup note: I have removed the expectation that envelope requests
    are only allowed on "privileged" listeners.  This made sense initially because
    we expected to use forwarding before the KIP-500 controller was available. That
    is not the case anymore and we expect the Envelope API to only be exposed on the
    controller listener. I have nevertheless preserved the existing workarounds to
    allow verification of the forwarding behavior in integration testing.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/clients/NodeApiVersions.java  |  10 +-
 .../kafka/common/network/ChannelBuilders.java      |  16 +-
 .../kafka/common/network/SaslChannelBuilder.java   |  12 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  71 ++++++---
 .../org/apache/kafka/common/protocol/Protocol.java |   2 +-
 .../kafka/common/requests/ApiVersionsResponse.java |  89 ++++++++---
 .../authenticator/SaslServerAuthenticator.java     |  13 +-
 .../common/message/AddOffsetsToTxnRequest.json     |   1 +
 .../common/message/AddPartitionsToTxnRequest.json  |   1 +
 .../common/message/AlterClientQuotasRequest.json   |   1 +
 .../common/message/AlterConfigsRequest.json        |   1 +
 .../resources/common/message/AlterIsrRequest.json  |   1 +
 .../AlterPartitionReassignmentsRequest.json        |   1 +
 .../common/message/AlterReplicaLogDirsRequest.json |   1 +
 .../message/AlterUserScramCredentialsRequest.json  |   1 +
 .../common/message/ApiVersionsRequest.json         |   1 +
 .../common/message/BeginQuorumEpochRequest.json    |   1 +
 .../common/message/BrokerHeartbeatRequest.json     |   1 +
 .../common/message/BrokerRegistrationRequest.json  |   1 +
 .../common/message/ControlledShutdownRequest.json  |   1 +
 .../common/message/CreateAclsRequest.json          |   1 +
 .../message/CreateDelegationTokenRequest.json      |   1 +
 .../common/message/CreatePartitionsRequest.json    |   1 +
 .../common/message/CreateTopicsRequest.json        |   1 +
 .../common/message/DeleteAclsRequest.json          |   1 +
 .../common/message/DeleteGroupsRequest.json        |   1 +
 .../common/message/DeleteRecordsRequest.json       |   1 +
 .../common/message/DeleteTopicsRequest.json        |   1 +
 .../common/message/DescribeAclsRequest.json        |   1 +
 .../message/DescribeClientQuotasRequest.json       |   1 +
 .../common/message/DescribeClusterRequest.json     |   1 +
 .../common/message/DescribeConfigsRequest.json     |   1 +
 .../message/DescribeDelegationTokenRequest.json    |   1 +
 .../common/message/DescribeGroupsRequest.json      |   1 +
 .../common/message/DescribeLogDirsRequest.json     |   1 +
 .../common/message/DescribeProducersRequest.json   |   1 +
 .../common/message/DescribeQuorumRequest.json      |   1 +
 .../DescribeUserScramCredentialsRequest.json       |   1 +
 .../common/message/ElectLeadersRequest.json        |   1 +
 .../common/message/EndQuorumEpochRequest.json      |   1 +
 .../resources/common/message/EndTxnRequest.json    |   1 +
 .../resources/common/message/EnvelopeRequest.json  |   1 +
 .../message/ExpireDelegationTokenRequest.json      |   1 +
 .../resources/common/message/FetchRequest.json     |   1 +
 .../common/message/FetchSnapshotRequest.json       |   1 +
 .../common/message/FindCoordinatorRequest.json     |   1 +
 .../resources/common/message/HeartbeatRequest.json |   1 +
 .../message/IncrementalAlterConfigsRequest.json    |   1 +
 .../common/message/InitProducerIdRequest.json      |   1 +
 .../resources/common/message/JoinGroupRequest.json |   1 +
 .../common/message/LeaderAndIsrRequest.json        |   1 +
 .../common/message/LeaveGroupRequest.json          |   1 +
 .../common/message/ListGroupsRequest.json          |   1 +
 .../common/message/ListOffsetsRequest.json         |   1 +
 .../message/ListPartitionReassignmentsRequest.json |   1 +
 .../resources/common/message/MetadataRequest.json  |   1 +
 .../common/message/OffsetCommitRequest.json        |   1 +
 .../common/message/OffsetDeleteRequest.json        |   1 +
 .../common/message/OffsetFetchRequest.json         |   1 +
 .../message/OffsetForLeaderEpochRequest.json       |   1 +
 .../resources/common/message/ProduceRequest.json   |   1 +
 .../message/RenewDelegationTokenRequest.json       |   1 +
 .../common/message/SaslAuthenticateRequest.json    |   1 +
 .../common/message/SaslHandshakeRequest.json       |   1 +
 .../common/message/StopReplicaRequest.json         |   1 +
 .../resources/common/message/SyncGroupRequest.json |   1 +
 .../common/message/TxnOffsetCommitRequest.json     |   1 +
 .../common/message/UnregisterBrokerRequest.json    |   1 +
 .../common/message/UpdateFeaturesRequest.json      |   1 +
 .../common/message/UpdateMetadataRequest.json      |   1 +
 .../main/resources/common/message/VoteRequest.json |   1 +
 .../common/message/WriteTxnMarkersRequest.json     |   1 +
 .../apache/kafka/clients/NetworkClientTest.java    |  17 +-
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  36 +++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  14 +-
 .../clients/consumer/internals/FetcherTest.java    |   6 +-
 .../clients/producer/internals/SenderTest.java     |  27 ++--
 .../apache/kafka/common/network/NioEchoServer.java |   5 +-
 .../common/network/SaslChannelBuilderTest.java     |  14 +-
 .../common/network/SslTransportLayerTest.java      |  24 ++-
 .../apache/kafka/common/protocol/ApiKeysTest.java  |  22 ++-
 .../common/requests/ApiVersionsResponseTest.java   |  51 +++---
 .../kafka/common/requests/RequestResponseTest.java | 100 ++++++------
 .../authenticator/SaslAuthenticatorTest.java       | 177 ++++++++++-----------
 .../authenticator/SaslServerAuthenticatorTest.java |   8 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  71 ++++-----
 .../main/scala/kafka/network/RequestChannel.scala  |  17 +-
 .../main/scala/kafka/network/SocketServer.scala    |  36 +++--
 .../scala/kafka/server/ApiVersionManager.scala     | 126 +++++++++++++++
 .../src/main/scala/kafka/server/BrokerServer.scala |  51 ++++--
 .../main/scala/kafka/server/ControllerServer.scala |   8 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  58 ++-----
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  15 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  40 +++--
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   8 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   9 +-
 .../kafka/admin/BrokerApiVersionsCommandTest.scala |   2 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |   5 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  23 +--
 .../unit/kafka/network/SocketServerTest.scala      |  61 ++++---
 .../server/AbstractApiVersionsRequestTest.scala    |  15 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  | 115 +++++++++++++
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   8 +-
 .../unit/kafka/server/ForwardingManagerTest.scala  |   3 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  85 ++--------
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   6 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |   4 +-
 .../kafka/message/ApiMessageTypeGenerator.java     |  69 +++++++-
 .../org/apache/kafka/message/MessageGenerator.java |   6 +-
 .../java/org/apache/kafka/message/MessageSpec.java |  16 +-
 .../apache/kafka/message/RequestListenerType.java  |  30 ++++
 .../jmh/metadata/MetadataRequestBenchmark.java     |   8 +-
 113 files changed, 1091 insertions(+), 585 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9ec16b9..aad58b0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -50,6 +50,7 @@
 
   <subpackage name="common">
     <allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
+    <allow class="org.apache.kafka.common.message.ApiMessageType" exact-match="true" />
     <disallow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common" exact-match="true" />
     <allow pkg="org.apache.kafka.common.annotation" />
@@ -108,6 +109,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />
     </subpackage>
 
     <subpackage name="resource">
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 658d481..3c09f0e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@@ -27,7 +24,10 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -62,7 +62,7 @@ public class NodeApiVersions {
      */
     public static NodeApiVersions create(Collection<ApiVersion> overrides) {
         List<ApiVersion> apiVersions = new LinkedList<>(overrides);
-        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
+        for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
             boolean exists = false;
             for (ApiVersion apiVersion : apiVersions) {
                 if (apiVersion.apiKey() == apiKey.id) {
@@ -170,7 +170,7 @@ public class NodeApiVersions {
 
         // Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
         // which may happen when the remote is too old.
-        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
+        for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
             if (!apiKeysText.containsKey(apiKey.id)) {
                 StringBuilder bld = new StringBuilder();
                 bld.append(apiKey.name).append("(").
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index ee5ed75..b4a1ce8 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.SslClientAuth;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
@@ -40,6 +41,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public class ChannelBuilders {
     private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class);
@@ -77,7 +79,7 @@ public class ChannelBuilders {
                 throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
         }
         return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
-                saslHandshakeRequestEnable, null, null, time, logContext);
+                saslHandshakeRequestEnable, null, null, time, logContext, null);
     }
 
     /**
@@ -89,6 +91,7 @@ public class ChannelBuilders {
      * @param tokenCache Delegation token cache
      * @param time the time instance
      * @param logContext the log context instance
+     * @param apiVersionSupplier supplier for ApiVersions responses sent prior to authentication
      *
      * @return the configured `ChannelBuilder`
      */
@@ -99,10 +102,11 @@ public class ChannelBuilders {
                                                       CredentialCache credentialCache,
                                                       DelegationTokenCache tokenCache,
                                                       Time time,
-                                                      LogContext logContext) {
+                                                      LogContext logContext,
+                                                      Supplier<ApiVersionsResponse> apiVersionSupplier) {
         return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
                 isInterBrokerListener, null, true, credentialCache,
-                tokenCache, time, logContext);
+                tokenCache, time, logContext, apiVersionSupplier);
     }
 
     private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -116,7 +120,8 @@ public class ChannelBuilders {
                                          CredentialCache credentialCache,
                                          DelegationTokenCache tokenCache,
                                          Time time,
-                                         LogContext logContext) {
+                                         LogContext logContext,
+                                         Supplier<ApiVersionsResponse> apiVersionSupplier) {
         Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
 
         ChannelBuilder channelBuilder;
@@ -174,7 +179,8 @@ public class ChannelBuilders {
                         tokenCache,
                         sslClientAuthOverride,
                         time,
-                        logContext);
+                        logContext,
+                        apiVersionSupplier);
                 break;
             case PLAINTEXT:
                 channelBuilder = new PlaintextChannelBuilder(listenerName);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 9001626..17988db 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.Login;
@@ -85,6 +86,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     private final DelegationTokenCache tokenCache;
     private final Map<String, LoginManager> loginManagers;
     private final Map<String, Subject> subjects;
+    private final Supplier<ApiVersionsResponse> apiVersionSupplier;
 
     private SslFactory sslFactory;
     private Map<String, ?> configs;
@@ -108,7 +110,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                               DelegationTokenCache tokenCache,
                               String sslClientAuthOverride,
                               Time time,
-                              LogContext logContext) {
+                              LogContext logContext,
+                              Supplier<ApiVersionsResponse> apiVersionSupplier) {
         this.mode = mode;
         this.jaasContexts = jaasContexts;
         this.loginManagers = new HashMap<>(jaasContexts.size());
@@ -126,6 +129,11 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
         this.time = time;
         this.logContext = logContext;
         this.log = logContext.logger(getClass());
+        this.apiVersionSupplier = apiVersionSupplier;
+
+        if (mode == Mode.SERVER && apiVersionSupplier == null) {
+            throw new IllegalArgumentException("Server channel builder must provide an ApiVersionResponse supplier");
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -266,7 +274,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                                                                ChannelMetadataRegistry metadataRegistry) {
         return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects,
                 kerberosShortNamer, listenerName, securityProtocol, transportLayer,
-                connectionsMaxReauthMsByMechanism, metadataRegistry, time);
+                connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier);
     }
 
     // Visible to override for testing
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 49a6130..475fc84 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -21,7 +21,10 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,19 +93,28 @@ public enum ApiKeys {
     ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
     DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
     ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
-    VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
+    VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false),
+    BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
+    END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
+    DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false),
     ALTER_ISR(ApiMessageType.ALTER_ISR, true),
     UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
-    ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
+    ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
+    FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
     DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
     DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
-    BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-    UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
+    BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
+    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
+    UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true);
+
+    private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
+        new EnumMap<>(ApiMessageType.ListenerType.class);
+
+    static {
+        for (ApiMessageType.ListenerType listenerType : ApiMessageType.ListenerType.values()) {
+            APIS_BY_LISTENER.put(listenerType, filterApisForListener(listenerType));
+        }
+    }
 
     // The generator ensures every `ApiMessageType` has a unique id
     private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
@@ -120,9 +132,6 @@ public enum ApiKeys {
     /** indicates the minimum required inter broker magic required to support the API */
     public final byte minRequiredInterBrokerMagic;
 
-    /** indicates whether this is an API which is only exposed by the KIP-500 controller **/
-    public final boolean isControllerOnlyApi;
-
     /** indicates whether the API is enabled for forwarding **/
     public final boolean forwardable;
 
@@ -142,24 +151,17 @@ public enum ApiKeys {
         this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
     }
 
-    ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
-        this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
-    }
-
     ApiKeys(
         ApiMessageType messageType,
         boolean clusterAction,
         byte minRequiredInterBrokerMagic,
-        boolean forwardable,
-        boolean isControllerOnlyApi
+        boolean forwardable
     ) {
         this.messageType = messageType;
         this.id = messageType.apiKey();
         this.name = messageType.name;
         this.clusterAction = clusterAction;
         this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
-        this.isControllerOnlyApi = isControllerOnlyApi;
-
         this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
         this.forwardable = forwardable;
     }
@@ -195,6 +197,14 @@ public enum ApiKeys {
         return messageType.lowestSupportedVersion();
     }
 
+    public List<Short> allVersions() {
+        List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
+        for (short version = oldestVersion(); version < latestVersion(); version++) {
+            versions.add(version);
+        }
+        return versions;
+    }
+
     public boolean isVersionSupported(short apiVersion) {
         return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
     }
@@ -207,6 +217,10 @@ public enum ApiKeys {
         return messageType.responseHeaderVersion(apiVersion);
     }
 
+    public boolean inScope(ApiMessageType.ListenerType listener) {
+        return messageType.listeners().contains(listener);
+    }
+
     private static String toHtml() {
         final StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");
@@ -214,7 +228,7 @@ public enum ApiKeys {
         b.append("<th>Name</th>\n");
         b.append("<th>Key</th>\n");
         b.append("</tr>");
-        for (ApiKeys key : ApiKeys.brokerApis()) {
+        for (ApiKeys key : zkBrokerApis()) {
             b.append("<tr>\n");
             b.append("<td>");
             b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
@@ -246,10 +260,19 @@ public enum ApiKeys {
         return hasBuffer.get();
     }
 
-    public static List<ApiKeys> brokerApis() {
-        return Arrays.stream(values())
-            .filter(api -> !api.isControllerOnlyApi)
+    public static EnumSet<ApiKeys> zkBrokerApis() {
+        return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
+    public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {
+        return APIS_BY_LISTENER.get(listener);
+    }
+
+    private static EnumSet<ApiKeys> filterApisForListener(ApiMessageType.ListenerType listener) {
+        List<ApiKeys> controllerApis = Arrays.stream(ApiKeys.values())
+            .filter(apiKey -> apiKey.messageType.listeners().contains(listener))
             .collect(Collectors.toList());
+        return EnumSet.copyOf(controllerApis);
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index f31c613..d455b26 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -133,7 +133,7 @@ public class Protocol {
             b.append("</pre>\n");
             schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
         }
-        for (ApiKeys key : ApiKeys.brokerApis()) {
+        for (ApiKeys key : ApiKeys.zkBrokerApis()) {
             // Key
             b.append("<h5>");
             b.append("<a name=\"The_Messages_" + key.name + "\">");
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 2bf9360..1190989 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.feature.Features;
 import org.apache.kafka.common.feature.FinalizedVersionRange;
 import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@@ -29,11 +30,12 @@ import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureK
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Possible error codes:
@@ -44,9 +46,6 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     public static final long UNKNOWN_FINALIZED_FEATURES_EPOCH = -1L;
 
-    public static final ApiVersionsResponse DEFAULT_API_VERSIONS_RESPONSE = createApiVersionsResponse(
-            DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
-
     private final ApiVersionsResponseData data;
 
     public ApiVersionsResponse(ApiVersionsResponseData data) {
@@ -96,49 +95,89 @@ public class ApiVersionsResponse extends AbstractResponse {
         }
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(final int throttleTimeMs, final byte minMagic) {
-        return createApiVersionsResponse(throttleTimeMs, minMagic, Features.emptySupportedFeatures(),
-            Features.emptyFinalizedFeatures(), UNKNOWN_FINALIZED_FEATURES_EPOCH);
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+        ApiMessageType.ListenerType listenerType
+    ) {
+        return defaultApiVersionsResponse(0, listenerType);
     }
 
-    private static ApiVersionsResponse createApiVersionsResponse(
-        final int throttleTimeMs,
-        final byte minMagic,
-        final Features<SupportedVersionRange> latestSupportedFeatures,
-        final Features<FinalizedVersionRange> finalizedFeatures,
-        final long finalizedFeaturesEpoch) {
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+        int throttleTimeMs,
+        ApiMessageType.ListenerType listenerType
+    ) {
+        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType));
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,
+        ApiVersionCollection apiVersions
+    ) {
+        return createApiVersionsResponse(
+            throttleTimeMs,
+            apiVersions,
+            Features.emptySupportedFeatures(),
+            Features.emptyFinalizedFeatures(),
+            UNKNOWN_FINALIZED_FEATURES_EPOCH
+        );
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,
+        ApiVersionCollection apiVersions,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        long finalizedFeaturesEpoch
+    ) {
         return new ApiVersionsResponse(
             createApiVersionsResponseData(
                 throttleTimeMs,
                 Errors.NONE,
-                defaultApiKeys(minMagic),
+                apiVersions,
                 latestSupportedFeatures,
                 finalizedFeatures,
-                finalizedFeaturesEpoch));
+                finalizedFeaturesEpoch
+            )
+        );
     }
 
-    public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
+    public static ApiVersionCollection filterApis(
+        RecordVersion minRecordVersion,
+        ApiMessageType.ListenerType listenerType
+    ) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
-        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
-            if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
+        for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
+            if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
                 apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
             }
         }
         return apiKeys;
     }
 
+    public static ApiVersionCollection collectApis(Set<ApiKeys> apiKeys) {
+        ApiVersionCollection res = new ApiVersionCollection();
+        for (ApiKeys apiKey : apiKeys) {
+            res.add(ApiVersionsResponse.toApiVersion(apiKey));
+        }
+        return res;
+    }
+
     /**
-     * Find the commonly agreed ApiVersions between local software and the controller.
+     * Find the common range of supported API versions between the locally
+     * known range and that of another set.
      *
-     * @param minMagic min inter broker magic
+     * @param listenerType the listener type which constrains the set of exposed APIs
+     * @param minRecordVersion min inter broker magic
      * @param activeControllerApiVersions controller ApiVersions
      * @return commonly agreed ApiVersion collection
      */
-    public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic,
-                                                                      final Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
+    public static ApiVersionCollection intersectForwardableApis(
+        final ApiMessageType.ListenerType listenerType,
+        final RecordVersion minRecordVersion,
+        final Map<ApiKeys, ApiVersion> activeControllerApiVersions
+    ) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
-        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
-            if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
+        for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
+            if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
                 ApiVersion brokerApiVersion = toApiVersion(apiKey);
 
                 final ApiVersion finalApiVersion;
@@ -161,7 +200,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         return apiKeys;
     }
 
-    public static ApiVersionsResponseData createApiVersionsResponseData(
+    private static ApiVersionsResponseData createApiVersionsResponseData(
         final int throttleTimeMs,
         final Errors error,
         final ApiVersionCollection apiKeys,
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 7fb5687..243495d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -85,6 +85,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Supplier;
 
 public class SaslServerAuthenticator implements Authenticator {
     // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
@@ -127,6 +128,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final Time time;
     private final ReauthInfo reauthInfo;
     private final ChannelMetadataRegistry metadataRegistry;
+    private final Supplier<ApiVersionsResponse> apiVersionSupplier;
 
     // Current SASL state
     private SaslState saslState = SaslState.INITIAL_REQUEST;
@@ -154,7 +156,8 @@ public class SaslServerAuthenticator implements Authenticator {
                                    TransportLayer transportLayer,
                                    Map<String, Long> connectionsMaxReauthMsByMechanism,
                                    ChannelMetadataRegistry metadataRegistry,
-                                   Time time) {
+                                   Time time,
+                                   Supplier<ApiVersionsResponse> apiVersionSupplier) {
         this.callbackHandlers = callbackHandlers;
         this.connectionId = connectionId;
         this.subjects = subjects;
@@ -166,6 +169,7 @@ public class SaslServerAuthenticator implements Authenticator {
         this.time = time;
         this.reauthInfo = new ReauthInfo();
         this.metadataRegistry = metadataRegistry;
+        this.apiVersionSupplier = apiVersionSupplier;
 
         this.configs = configs;
         @SuppressWarnings("unchecked")
@@ -563,11 +567,6 @@ public class SaslServerAuthenticator implements Authenticator {
     }
 
     // Visible to override for testing
-    protected ApiVersionsResponse apiVersionsResponse() {
-        return ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
-    }
-
-    // Visible to override for testing
     protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
         this.enableKafkaSaslAuthenticateHeaders = flag;
     }
@@ -583,7 +582,7 @@ public class SaslServerAuthenticator implements Authenticator {
         else {
             metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(),
                 apiVersionsRequest.data().clientSoftwareVersion()));
-            sendKafkaResponse(context, apiVersionsResponse());
+            sendKafkaResponse(context, apiVersionSupplier.get());
             setSaslState(SaslState.HANDSHAKE_REQUEST);
         }
     }
diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index b2bb9a7..ade3fc7 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 25,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "AddOffsetsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 3e1d720..4920da1 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 24,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "AddPartitionsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
index 715a006..6bfdc92 100644
--- a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 49,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "AlterClientQuotasRequest",
   "validVersions": "0-1",
   // Version 1 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index a1d7d1d..31057e3 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 33,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterIsrRequest.json b/clients/src/main/resources/common/message/AlterIsrRequest.json
index 3d5084a..f950cd7 100644
--- a/clients/src/main/resources/common/message/AlterIsrRequest.json
+++ b/clients/src/main/resources/common/message/AlterIsrRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 56,
   "type": "request",
+  "listeners": ["zkBroker", "controller"],
   "name": "AlterIsrRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index ecf7eca..2e12441 100644
--- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 45,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "AlterPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
index a674907..2306caa 100644
--- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 34,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "AlterReplicaLogDirsRequest",
   // Version 1 is the same as version 0.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
index 242bcb9..8937394 100644
--- a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 51,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "AlterUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 66e4511..b86edbf 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 18,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "ApiVersionsRequest",
   // Versions 0 through 2 of ApiVersionsRequest are the same.
   //
diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
index fd98609..9f7969f 100644
--- a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
+++ b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 53,
   "type": "request",
+  "listeners": ["controller"],
   "name": "BeginQuorumEpochRequest",
   "validVersions": "0",
   "fields": [
diff --git a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
index 105a818..ce08d11 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 63,
   "type": "request",
+  "listeners": ["controller"],
   "name": "BrokerHeartbeatRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index b123cc1..3e27cf1 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey":62,
   "type": "request",
+  "listeners": ["controller"],
   "name": "BrokerRegistrationRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ControlledShutdownRequest.json b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
index 5756d1c..49561f7 100644
--- a/clients/src/main/resources/common/message/ControlledShutdownRequest.json
+++ b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 7,
   "type": "request",
+  "listeners": ["zkBroker", "controller"],
   "name": "ControlledShutdownRequest",
   // Version 0 of ControlledShutdownRequest has a non-standard request header
   // which does not include clientId.  Version 1 and later use the standard
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json b/clients/src/main/resources/common/message/CreateAclsRequest.json
index a9bd9c5..5b3bfed 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 30,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "CreateAclsRequest",
   // Version 1 adds resource pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
index 8d88135..0c31d32 100644
--- a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 38,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "CreateDelegationTokenRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index ba11388..6e24949 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 37,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "CreatePartitionsRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json
index 1a8d57a..0882de9 100644
--- a/clients/src/main/resources/common/message/CreateTopicsRequest.json
+++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 19,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "CreateTopicsRequest",
   // Version 1 adds validateOnly.
   //
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index 664737e..fd7c152 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DeleteAclsRequest",
   // Version 1 adds the pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
index 833ed7a..1ac6a05 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 42,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DeleteGroupsRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/DeleteRecordsRequest.json b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
index 93cbd56..06a12d8 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 21,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DeleteRecordsRequest",
   // Version 1 is the same as version 0.
 
diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
index 7e11554..f757ff7 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 20,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DeleteTopicsRequest",
   // Versions 0, 1, 2, and 3 are the same.
   //
diff --git a/clients/src/main/resources/common/message/DescribeAclsRequest.json b/clients/src/main/resources/common/message/DescribeAclsRequest.json
index a9c3676..58886da 100644
--- a/clients/src/main/resources/common/message/DescribeAclsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeAclsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 29,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DescribeAclsRequest",
   // Version 1 adds resource pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
index e35d3b0..d14cfc9 100644
--- a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeClientQuotasRequest",
   // Version 1 enables flexible versions.
   "validVersions": "0-1",
diff --git a/clients/src/main/resources/common/message/DescribeClusterRequest.json b/clients/src/main/resources/common/message/DescribeClusterRequest.json
index 31eb573..192e4d8 100644
--- a/clients/src/main/resources/common/message/DescribeClusterRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 60,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeClusterRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index 01a45eb..23be19c 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 32,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DescribeConfigsRequest",
   // Version 1 adds IncludeSynonyms.
   // Version 2 is the same as version 1.
diff --git a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
index 32a4f5c..da5bbd0 100644
--- a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 41,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DescribeDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 8a5887a..6b10b06 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 15,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeGroupsRequest",
   // Versions 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 577f2eb..65658ea 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 35,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeLogDirsRequest",
   // Version 1 is the same as version 0.
   "validVersions": "0-2",
diff --git a/clients/src/main/resources/common/message/DescribeProducersRequest.json b/clients/src/main/resources/common/message/DescribeProducersRequest.json
index bd35f91..0ffd834 100644
--- a/clients/src/main/resources/common/message/DescribeProducersRequest.json
+++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 61,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeProducersRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/DescribeQuorumRequest.json b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
index f91d679..cd4a7f1 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 55,
   "type": "request",
+  "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
index f7f8c68..2f7a111 100644
--- a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 50,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "DescribeUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index 5b86c96..8a095e1 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 43,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "ElectLeadersRequest",
   // Version 1 implements multiple leader election types, as described by KIP-460.
   //
diff --git a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
index 45dacde..3ef7f63 100644
--- a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
+++ b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 54,
   "type": "request",
+  "listeners": ["controller"],
   "name": "EndQuorumEpochRequest",
   "validVersions": "0",
   "fields": [
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json
index de18b43..f16ef76 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 26,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "EndTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/EnvelopeRequest.json b/clients/src/main/resources/common/message/EnvelopeRequest.json
index a1aa760..1f6ff62 100644
--- a/clients/src/main/resources/common/message/EnvelopeRequest.json
+++ b/clients/src/main/resources/common/message/EnvelopeRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 58,
   "type": "request",
+  "listeners": ["controller"],
   "name": "EnvelopeRequest",
   // Request struct for forwarding.
   "validVersions": "0",
diff --git a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
index a990862..c830a93 100644
--- a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 40,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "ExpireDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 0dcdd7a..ab4c95f 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 1,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "FetchRequest",
   //
   // Version 1 is the same as version 0.
diff --git a/clients/src/main/resources/common/message/FetchSnapshotRequest.json b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
index c3518f4..accc227 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotRequest.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 59,
   "type": "request",
+  "listeners": ["controller"],
   "name": "FetchSnapshotRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/FindCoordinatorRequest.json b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
index 6a90887..cd5b77a 100644
--- a/clients/src/main/resources/common/message/FindCoordinatorRequest.json
+++ b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 10,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "FindCoordinatorRequest",
   // Version 1 adds KeyType.
   //
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 4d799aa..dcf776d 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 12,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "HeartbeatRequest",
   // Version 1 and version 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
index b1fb1e9..d4955c9 100644
--- a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 44,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "IncrementalAlterConfigsRequest",
   // Version 1 is the first flexible version.
   "validVersions": "0-1",
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index dc85063..e8795e6 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 22,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "InitProducerIdRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 2650d89..d9113b7 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 11,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "JoinGroupRequest",
   // Version 1 adds RebalanceTimeoutMs.
   //
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index 129b7f7..57e6f21 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 4,
   "type": "request",
+  "listeners": ["zkBroker"],
   "name": "LeaderAndIsrRequest",
   // Version 1 adds IsNew.
   //
diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json b/clients/src/main/resources/common/message/LeaveGroupRequest.json
index acc7938..893c945 100644
--- a/clients/src/main/resources/common/message/LeaveGroupRequest.json
+++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 13,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "LeaveGroupRequest",
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ListGroupsRequest.json b/clients/src/main/resources/common/message/ListGroupsRequest.json
index dbe6d9b..3f62e28 100644
--- a/clients/src/main/resources/common/message/ListGroupsRequest.json
+++ b/clients/src/main/resources/common/message/ListGroupsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 16,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "ListGroupsRequest",
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 9855a4b..a464c93 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 2,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "ListOffsetsRequest",
   // Version 1 removes MaxNumOffsets.  From this version forward, only a single
   // offset can be returned.
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index 7322f25..f013e3f 100644
--- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 46,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "ListPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index 02af116..6690810 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 3,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
   "validVersions": "0-11",
   "flexibleVersions": "9+",
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 096b619..cf112e1 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 8,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "OffsetCommitRequest",
   // Version 1 adds timestamp and group membership information, as well as the commit timestamp.
   //
diff --git a/clients/src/main/resources/common/message/OffsetDeleteRequest.json b/clients/src/main/resources/common/message/OffsetDeleteRequest.json
index 108ca9f..394d1bb 100644
--- a/clients/src/main/resources/common/message/OffsetDeleteRequest.json
+++ b/clients/src/main/resources/common/message/OffsetDeleteRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 47,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "OffsetDeleteRequest",
   "validVersions": "0",
   "fields": [
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index ddd53fc..d4a4d5f 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 9,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "OffsetFetchRequest",
   // In version 0, the request read offsets from ZK.
   //
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index 75b6c8d..2440bec 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 23,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "OffsetForLeaderEpochRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json
index 73e72c3..121cd42 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 0,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "ProduceRequest",
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
index 9fbb0fa..182682e 100644
--- a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 39,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "RenewDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
index 122cef5..3f5558b 100644
--- a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
+++ b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 36,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "SaslAuthenticateRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/SaslHandshakeRequest.json b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
index f384f41..3384db8 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 17,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "SaslHandshakeRequest",
   // Version 1 supports SASL_AUTHENTICATE.
   // NOTE: Version cannot be easily bumped due to incorrect
diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json b/clients/src/main/resources/common/message/StopReplicaRequest.json
index d43356c..b10154f 100644
--- a/clients/src/main/resources/common/message/StopReplicaRequest.json
+++ b/clients/src/main/resources/common/message/StopReplicaRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 5,
   "type": "request",
+  "listeners": ["zkBroker"],
   "name": "StopReplicaRequest",
   // Version 1 adds the broker epoch and reorganizes the partitions to be stored
   // per topic.
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index a0a5991..5525844 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 14,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "SyncGroupRequest",
   // Versions 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index bd91df3..a832ef7 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 28,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "TxnOffsetCommitRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index 3c43b16..ef72bfe 100644
--- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 64,
   "type": "request",
+  "listeners": ["broker", "controller"],
   "name": "UnregisterBrokerRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index ab882df..21e1bf6 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 57,
   "type": "request",
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "UpdateFeaturesRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index 99d33f1..5f397a9 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 6,
   "type": "request",
+  "listeners": ["zkBroker"],
   "name": "UpdateMetadataRequest",
   // Version 1 allows specifying multiple endpoints for each broker.
   //
diff --git a/clients/src/main/resources/common/message/VoteRequest.json b/clients/src/main/resources/common/message/VoteRequest.json
index 3926ba7..fcc0017 100644
--- a/clients/src/main/resources/common/message/VoteRequest.json
+++ b/clients/src/main/resources/common/message/VoteRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 52,
   "type": "request",
+  "listeners": ["controller"],
   "name": "VoteRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
index 3fdfa05..9e29fb3 100644
--- a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
+++ b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
@@ -16,6 +16,7 @@
 {
   "apiKey": 27,
   "type": "request",
+  "listeners": ["zkBroker", "broker"],
   "name": "WriteTxnMarkersRequest",
   // Version 1 enables flexible versions.
   "validVersions": "0-1",
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index eb130ff..b13f854 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@@ -247,7 +248,8 @@ public class NetworkClientTest {
 
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
-            setExpectedApiVersionsResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
+            setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse(
+                ApiMessageType.ListenerType.ZK_BROKER));
         }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
@@ -295,8 +297,7 @@ public class NetworkClientTest {
         assertTrue(client.hasInFlightRequests(node.idString()));
 
         // prepare response
-        delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(),
-            ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
+        delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), defaultApiVersionsResponse());
 
         // handle completed receives
         client.poll(0, time.milliseconds());
@@ -367,8 +368,7 @@ public class NetworkClientTest {
         assertEquals(2, header.apiVersion());
 
         // prepare response
-        delayedApiVersionsResponse(1, (short) 0,
-            ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
+        delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
 
         // handle completed receives
         client.poll(0, time.milliseconds());
@@ -434,8 +434,7 @@ public class NetworkClientTest {
         assertEquals(0, header.apiVersion());
 
         // prepare response
-        delayedApiVersionsResponse(1, (short) 0,
-            ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
+        delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
 
         // handle completed receives
         client.poll(0, time.milliseconds());
@@ -1079,6 +1078,10 @@ public class NetworkClientTest {
         assertFalse(client.isReady(node, time.milliseconds()));
     }
 
+    private ApiVersionsResponse defaultApiVersionsResponse() {
+        return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
     private static class TestCallbackHandler implements RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 7c19d9f..b04d83b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -16,18 +16,21 @@
  */
 package org.apache.kafka.clients;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -38,7 +41,7 @@ public class NodeApiVersionsTest {
         NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
         StringBuilder bld = new StringBuilder();
         String prefix = "(";
-        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
+        for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
             bld.append(prefix).append(apiKey.name).
                     append("(").append(apiKey.id).append("): UNSUPPORTED");
             prefix = ", ";
@@ -133,27 +136,26 @@ public class NodeApiVersionsTest {
             () -> apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
     }
 
-    @Test
-    public void testUsableVersionLatestVersions() {
-        List<ApiVersion> versionList = new LinkedList<>(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys());
+    @ParameterizedTest
+    @EnumSource(ApiMessageType.ListenerType.class)
+    public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
+        ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
+        List<ApiVersion> versionList = new LinkedList<>(defaultResponse.data().apiKeys());
         // Add an API key that we don't know about.
         versionList.add(new ApiVersion()
                 .setApiKey((short) 100)
                 .setMinVersion((short) 0)
                 .setMaxVersion((short) 1));
         NodeApiVersions versions = new NodeApiVersions(versionList);
-        for (ApiKeys apiKey: ApiKeys.values()) {
-            if (apiKey.isControllerOnlyApi) {
-                assertNull(versions.apiVersion(apiKey));
-            } else {
-                assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
-            }
+        for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
+            assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
         }
     }
 
-    @Test
-    public void testConstructionFromApiVersionsResponse() {
-        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
+    @ParameterizedTest
+    @EnumSource(ApiMessageType.ListenerType.class)
+    public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
+        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
         NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
 
         for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 97b4b66..ce8f056 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -73,6 +73,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
 import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
 import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
 import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.CreateAclsResponseData;
@@ -125,6 +126,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterClientQuotasResponse;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
@@ -526,17 +528,17 @@ public class KafkaAdminClientTest {
 
     private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) {
         if (error == Errors.NONE) {
-            return new ApiVersionsResponse(ApiVersionsResponse.createApiVersionsResponseData(
-                ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(),
-                error,
-                ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(),
+            return ApiVersionsResponse.createApiVersionsResponse(
+                0,
+                ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
                 convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
                 convertFinalizedFeaturesMap(defaultFeatureMetadata().finalizedFeatures()),
-                defaultFeatureMetadata().finalizedFeaturesEpoch().get()));
+                defaultFeatureMetadata().finalizedFeaturesEpoch().get()
+            );
         }
         return new ApiVersionsResponse(
             new ApiVersionsResponseData()
-                .setThrottleTimeMs(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs())
+                .setThrottleTimeMs(0)
                 .setErrorCode(error.code()));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2b9df62..0a7d5cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
 import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
@@ -2074,8 +2075,9 @@ public class FetcherTest {
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
                 time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
-        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(ApiVersionsResponse.createApiVersionsResponse(
-            400, RecordBatch.CURRENT_MAGIC_VALUE), ApiKeys.API_VERSIONS.latestVersion(), 0);
+        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+            400, ApiMessageType.ListenerType.ZK_BROKER);
+        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
 
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
         while (!client.ready(node, time.milliseconds())) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 439ba3b..e118c11 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -24,13 +24,6 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.errors.TransactionAbortedException;
-import org.apache.kafka.common.message.ProduceRequestData;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
-import org.apache.kafka.common.requests.MetadataRequest;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -39,15 +32,19 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TransactionAbortedException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -66,15 +63,19 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
@@ -105,12 +106,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.AdditionalMatchers.geq;
 import static org.mockito.ArgumentMatchers.any;
@@ -287,9 +288,9 @@ public class SenderTest {
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
-        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(
-            ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE),
-            ApiKeys.API_VERSIONS.latestVersion(), 0);
+        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+            400, ApiMessageType.ListenerType.ZK_BROKER);
+        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
 
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
         while (!client.ready(node, time.milliseconds())) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 15f8079..53b46e7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -18,9 +18,11 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
@@ -117,7 +119,8 @@ public class NioEchoServer extends Thread {
         LogContext logContext = new LogContext();
         if (channelBuilder == null)
             channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
-                    securityProtocol, config, credentialCache, tokenCache, time, logContext);
+                securityProtocol, config, credentialCache, tokenCache, time, logContext,
+                () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
         this.metrics = new Metrics();
         this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
                 "MetricGroup", channelBuilder, logContext);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 820500e..1697c62 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -48,6 +50,7 @@ import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -160,9 +163,8 @@ public class SaslChannelBuilderTest {
 
     private SaslChannelBuilder createGssapiChannelBuilder(Map<String, JaasContext> jaasContexts, GSSManager gssManager) {
         SaslChannelBuilder channelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
-                SecurityProtocol.SASL_PLAINTEXT,
-                new ListenerName("GSSAPI"), false, "GSSAPI",
-                true, null, null, null, Time.SYSTEM, new LogContext()) {
+            SecurityProtocol.SASL_PLAINTEXT, new ListenerName("GSSAPI"), false, "GSSAPI",
+            true, null, null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier()) {
 
             @Override
             protected GSSManager gssManager() {
@@ -174,6 +176,10 @@ public class SaslChannelBuilderTest {
         return channelBuilder;
     }
 
+    private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
+        return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
     private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) {
         Class<?> loginModule = null;
         switch (saslMechanism) {
@@ -198,7 +204,7 @@ public class SaslChannelBuilderTest {
         Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
         return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName(saslMechanism),
                 false, saslMechanism, true, null,
-                null, null, Time.SYSTEM, new LogContext());
+                null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier());
     }
 
     public static final class TestGssapiLoginModule implements LoginModule {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 2b3bf56..13f7632 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
@@ -58,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import javax.net.ssl.SSLContext;
@@ -1018,7 +1021,8 @@ public class SslTransportLayerTest {
         TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
-                true, securityProtocol, config, null, null, time, new LogContext());
+            true, securityProtocol, config, null, null, time, new LogContext(),
+            defaultApiVersionsSupplier());
         server = new NioEchoServer(listenerName, securityProtocol, config,
                 "localhost", serverChannelBuilder, null, time);
         server.start();
@@ -1040,8 +1044,9 @@ public class SslTransportLayerTest {
         args.sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
         TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
-        assertThrows(KafkaException.class, () -> ChannelBuilders.serverChannelBuilder(listenerName, true, securityProtocol, config,
-                null, null, time, new LogContext()));
+        assertThrows(KafkaException.class, () -> ChannelBuilders.serverChannelBuilder(
+            listenerName, true, securityProtocol, config,
+            null, null, time, new LogContext(), defaultApiVersionsSupplier()));
     }
 
     /**
@@ -1055,7 +1060,8 @@ public class SslTransportLayerTest {
         TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
-                false, securityProtocol, config, null, null, time, new LogContext());
+            false, securityProtocol, config, null, null, time, new LogContext(),
+            defaultApiVersionsSupplier());
         server = new NioEchoServer(listenerName, securityProtocol, config,
                 "localhost", serverChannelBuilder, null, time);
         server.start();
@@ -1111,7 +1117,8 @@ public class SslTransportLayerTest {
         TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
-            false, securityProtocol, config, null, null, time, new LogContext());
+            false, securityProtocol, config, null, null, time, new LogContext(),
+            defaultApiVersionsSupplier());
         server = new NioEchoServer(listenerName, securityProtocol, config,
             "localhost", serverChannelBuilder, null, time);
         server.start();
@@ -1176,7 +1183,8 @@ public class SslTransportLayerTest {
         TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
-                false, securityProtocol, config, null, null, time, new LogContext());
+            false, securityProtocol, config, null, null, time, new LogContext(),
+            defaultApiVersionsSupplier());
         server = new NioEchoServer(listenerName, securityProtocol, config,
                 "localhost", serverChannelBuilder, null, time);
         server.start();
@@ -1334,6 +1342,10 @@ public class SslTransportLayerTest {
         void run() throws IOException;
     }
 
+    private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
+        return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
     static class TestSslChannelBuilder extends SslChannelBuilder {
 
         private Integer netReadBufSizeOverride;
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index 52121de..17d2e1c 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -20,9 +20,12 @@ import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Set;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -60,11 +63,7 @@ public class ApiKeysTest {
         Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
         // Newer protocol apis include throttle time ms even for cluster actions
         Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR);
-        for (ApiKeys apiKey: ApiKeys.values()) {
-            // Disable broker-to-controller API throttling test
-            if (apiKey.isControllerOnlyApi) {
-                continue;
-            }
+        for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
             Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
             BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
             if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey))
@@ -74,4 +73,17 @@ public class ApiKeysTest {
                 assertNotNull(throttleTimeField, "Throttle time field missing: " + apiKey);
         }
     }
+
+    @Test
+    public void testApiScope() {
+        Set<ApiKeys> apisMissingScope = new HashSet<>();
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            if (apiKey.messageType.listeners().isEmpty()) {
+                apisMissingScope.add(apiKey);
+            }
+        }
+        assertEquals(Collections.emptySet(), apisMissingScope,
+            "Found some APIs missing scope definition");
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index 38a5869..2c9b1e8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -17,17 +17,17 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -38,21 +38,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ApiVersionsResponseTest {
 
-    @Test
-    public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
-        assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.brokerApis()));
-        assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
-        assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
-        assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
-    }
-
-    @Test
-    public void shouldHaveCorrectDefaultApiVersionsResponse() {
-        Collection<ApiVersion> apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
-        assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API versions for all API keys must be maintained.");
+    @ParameterizedTest
+    @EnumSource(ApiMessageType.ListenerType.class)
+    public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerType scope) {
+        ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
+        assertEquals(ApiKeys.apisForListener(scope).size(), defaultResponse.data().apiKeys().size(),
+            "API versions for all API keys must be maintained.");
 
-        for (ApiKeys key : ApiKeys.brokerApis()) {
-            ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
+        for (ApiKeys key : ApiKeys.apisForListener(scope)) {
+            ApiVersion version = defaultResponse.apiVersion(key.id);
             assertNotNull(version, "Could not find ApiVersion for API " + key.name);
             assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name);
             assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect max version for Api " + key.name);
@@ -74,9 +68,9 @@ public class ApiVersionsResponseTest {
             }
         }
 
-        assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
-        assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
-        assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
+        assertTrue(defaultResponse.data().supportedFeatures().isEmpty());
+        assertTrue(defaultResponse.data().finalizedFeatures().isEmpty());
+        assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, defaultResponse.data().finalizedFeaturesEpoch());
     }
 
     @Test
@@ -96,9 +90,11 @@ public class ApiVersionsResponseTest {
                 .setMaxVersion(maxVersion))
         );
 
-        ApiVersionCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions(
-            RecordBatch.CURRENT_MAGIC_VALUE,
-            activeControllerApiVersions);
+        ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis(
+            ApiMessageType.ListenerType.ZK_BROKER,
+            RecordVersion.current(),
+            activeControllerApiVersions
+        );
 
         verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
 
@@ -149,11 +145,4 @@ public class ApiVersionsResponseTest {
         assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey));
     }
 
-    private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
-        final Set<ApiKeys> apiKeys = new HashSet<>();
-        for (final ApiVersion version : apiVersions.data().apiKeys()) {
-            apiKeys.add(ApiKeys.forId(version.apiKey()));
-        }
-        return apiKeys;
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f87d9f9..4c70771 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
 import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic;
 import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection;
 import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
@@ -145,11 +146,11 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
-import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
 import org.apache.kafka.common.message.ProduceRequestData;
 import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
@@ -346,17 +347,6 @@ public class RequestResponseTest {
         checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true);
         checkResponse(createSaslAuthenticateResponse(), 0, true);
         checkResponse(createSaslAuthenticateResponse(), 1, true);
-        checkRequest(createApiVersionRequest(), true);
-        checkErrorResponse(createApiVersionRequest(), unknownServerException, true);
-        checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true);
-        checkResponse(createApiVersionResponse(), 0, true);
-        checkResponse(createApiVersionResponse(), 1, true);
-        checkResponse(createApiVersionResponse(), 2, true);
-        checkResponse(createApiVersionResponse(), 3, true);
-        checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 0, true);
-        checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 1, true);
-        checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 2, true);
-        checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true);
 
         for (int v = ApiKeys.CREATE_TOPICS.oldestVersion(); v <= ApiKeys.CREATE_TOPICS.latestVersion(); v++) {
             checkRequest(createCreateTopicRequest(v), true);
@@ -522,8 +512,19 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void testApiVersionsSerialization() {
+        for (short v : ApiKeys.API_VERSIONS.allVersions()) {
+            checkRequest(createApiVersionRequest(v), true);
+            checkErrorResponse(createApiVersionRequest(v), unknownServerException, true);
+            checkErrorResponse(createApiVersionRequest(v), new UnsupportedVersionException("Not Supported"), true);
+            checkResponse(createApiVersionResponse(), v, true);
+            checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), v, true);
+        }
+    }
+
+    @Test
     public void testBrokerHeartbeatSerialization() {
-        for (short v = ApiKeys.BROKER_HEARTBEAT.oldestVersion(); v <= ApiKeys.BROKER_HEARTBEAT.latestVersion(); v++) {
+        for (short v : ApiKeys.BROKER_HEARTBEAT.allVersions()) {
             checkRequest(createBrokerHeartbeatRequest(v), true);
             checkErrorResponse(createBrokerHeartbeatRequest(v), unknownServerException, true);
             checkResponse(createBrokerHeartbeatResponse(), v, true);
@@ -532,7 +533,7 @@ public class RequestResponseTest {
 
     @Test
     public void testBrokerRegistrationSerialization() {
-        for (short v = ApiKeys.BROKER_REGISTRATION.oldestVersion(); v <= ApiKeys.BROKER_REGISTRATION.latestVersion(); v++) {
+        for (short v : ApiKeys.BROKER_REGISTRATION.allVersions()) {
             checkRequest(createBrokerRegistrationRequest(v), true);
             checkErrorResponse(createBrokerRegistrationRequest(v), unknownServerException, true);
             checkResponse(createBrokerRegistrationResponse(), 0, true);
@@ -540,8 +541,8 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void testDescribeProducersSerialization() throws Exception {
-        for (short v = ApiKeys.DESCRIBE_PRODUCERS.oldestVersion(); v <= ApiKeys.DESCRIBE_PRODUCERS.latestVersion(); v++) {
+    public void testDescribeProducersSerialization() {
+        for (short v : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) {
             checkRequest(createDescribeProducersRequest(v), true);
             checkErrorResponse(createDescribeProducersRequest(v), unknownServerException, true);
             checkResponse(createDescribeProducersResponse(), v, true);
@@ -549,8 +550,8 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void testDescribeClusterSerialization() throws Exception {
-        for (short v = ApiKeys.DESCRIBE_CLUSTER.oldestVersion(); v <= ApiKeys.DESCRIBE_CLUSTER.latestVersion(); v++) {
+    public void testDescribeClusterSerialization() {
+        for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) {
             checkRequest(createDescribeClusterRequest(v), true);
             checkErrorResponse(createDescribeClusterRequest(v), unknownServerException, true);
             checkResponse(createDescribeClusterResponse(), v, true);
@@ -559,7 +560,7 @@ public class RequestResponseTest {
 
     @Test
     public void testUnregisterBrokerSerialization() {
-        for (short v = ApiKeys.UNREGISTER_BROKER.oldestVersion(); v <= ApiKeys.UNREGISTER_BROKER.latestVersion(); v++) {
+        for (short v : ApiKeys.UNREGISTER_BROKER.allVersions()) {
             checkRequest(createUnregisterBrokerRequest(v), true);
             checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true);
             checkResponse(createUnregisterBrokerResponse(), v, true);
@@ -1013,47 +1014,56 @@ public class RequestResponseTest {
 
     @Test
     public void testApiVersionResponseWithUnsupportedError() {
-        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
-        ApiVersionsResponse response = request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception());
-
-        assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode());
-
-        ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
-        assertNotNull(apiVersion);
-        assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey());
-        assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion());
-        assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion());
+        for (short version : ApiKeys.API_VERSIONS.allVersions()) {
+            ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version);
+            ApiVersionsResponse response = request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception());
+            assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode());
+
+            ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
+            assertNotNull(apiVersion);
+            assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey());
+            assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion());
+            assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion());
+        }
     }
 
     @Test
     public void testApiVersionResponseWithNotUnsupportedError() {
-        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
-        ApiVersionsResponse response = request.getErrorResponse(0, Errors.INVALID_REQUEST.exception());
+        for (short version : ApiKeys.API_VERSIONS.allVersions()) {
+            ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version);
+            ApiVersionsResponse response = request.getErrorResponse(0, Errors.INVALID_REQUEST.exception());
+            assertEquals(response.data().errorCode(), Errors.INVALID_REQUEST.code());
+            assertTrue(response.data().apiKeys().isEmpty());
+        }
+    }
 
-        assertEquals(response.data().errorCode(), Errors.INVALID_REQUEST.code());
-        assertTrue(response.data().apiKeys().isEmpty());
+    private ApiVersionsResponse defaultApiVersionsResponse() {
+        return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
     }
 
     @Test
     public void testApiVersionResponseParsingFallback() {
-        ByteBuffer buffer = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize((short) 0);
-        ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, ApiKeys.API_VERSIONS.latestVersion());
-
-        assertEquals(Errors.NONE.code(), response.data().errorCode());
+        for (short version : ApiKeys.API_VERSIONS.allVersions()) {
+            ByteBuffer buffer = defaultApiVersionsResponse().serialize((short) 0);
+            ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, version);
+            assertEquals(Errors.NONE.code(), response.data().errorCode());
+        }
     }
 
     @Test
     public void testApiVersionResponseParsingFallbackException() {
-        short version = 0;
-        assertThrows(BufferUnderflowException.class, () -> ApiVersionsResponse.parse(ByteBuffer.allocate(0), version));
+        for (final short version : ApiKeys.API_VERSIONS.allVersions()) {
+            assertThrows(BufferUnderflowException.class, () -> ApiVersionsResponse.parse(ByteBuffer.allocate(0), version));
+        }
     }
 
     @Test
     public void testApiVersionResponseParsing() {
-        ByteBuffer buffer = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize(ApiKeys.API_VERSIONS.latestVersion());
-        ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, ApiKeys.API_VERSIONS.latestVersion());
-
-        assertEquals(Errors.NONE.code(), response.data().errorCode());
+        for (short version : ApiKeys.API_VERSIONS.allVersions()) {
+            ByteBuffer buffer = defaultApiVersionsResponse().serialize(version);
+            ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, version);
+            assertEquals(Errors.NONE.code(), response.data().errorCode());
+        }
     }
 
     @Test
@@ -1773,8 +1783,8 @@ public class RequestResponseTest {
         return new SaslAuthenticateResponse(data);
     }
 
-    private ApiVersionsRequest createApiVersionRequest() {
-        return new ApiVersionsRequest.Builder().build();
+    private ApiVersionsRequest createApiVersionRequest(short version) {
+        return new ApiVersionsRequest.Builder().build(version);
     }
 
     private ApiVersionsResponse createApiVersionResponse() {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 2d85f55..713f817 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -16,41 +16,6 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.charset.StandardCharsets;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Base64.Encoder;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -60,13 +25,14 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SslAuthenticationException;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
-import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
 import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
 import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -87,26 +53,28 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.requests.ListOffsetsResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.security.auth.AuthenticationContext;
-import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
-import org.apache.kafka.common.security.auth.Login;
-import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
@@ -115,20 +83,17 @@ import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBea
 import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws;
 import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
 import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.internals.ScramFormatter;
-import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SecurityUtils;
-import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
-import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
-import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
-
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
@@ -137,6 +102,41 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.opentest4j.AssertionFailedError;
 
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1903,31 +1903,18 @@ public class SaslAuthenticatorTest {
         boolean isScram = ScramMechanism.isScram(saslMechanism);
         if (isScram)
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
+
+        Supplier<ApiVersionsResponse> apiVersionSupplier = () -> {
+            ApiVersionCollection versionCollection = new ApiVersionCollection(2);
+            versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
+            versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
+            return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection));
+        };
+
         SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
                 securityProtocol, listenerName, false, saslMechanism, true,
-                credentialCache, null, null, time, new LogContext()) {
-
-            @Override
-            protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs,
-                                                                       Map<String, AuthenticateCallbackHandler> callbackHandlers,
-                                                                       String id,
-                                                                       TransportLayer transportLayer,
-                                                                       Map<String, Subject> subjects,
-                                                                       Map<String, Long> connectionsMaxReauthMsByMechanism,
-                                                                       ChannelMetadataRegistry metadataRegistry) {
-                return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName,
-                        securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
+                credentialCache, null, null, time, new LogContext(), apiVersionSupplier);
 
-                    @Override
-                    protected ApiVersionsResponse apiVersionsResponse() {
-                        ApiVersionCollection versionCollection = new ApiVersionCollection(2);
-                        versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
-                        versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
-                        return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection));
-                    }
-                };
-            }
-        };
         serverChannelBuilder.configure(saslServerConfigs);
         server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(saslServerConfigs),
                 "localhost", serverChannelBuilder, credentialCache, time);
@@ -1945,10 +1932,29 @@ public class SaslAuthenticatorTest {
         boolean isScram = ScramMechanism.isScram(saslMechanism);
         if (isScram)
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
+
+        Supplier<ApiVersionsResponse> apiVersionSupplier = () -> {
+            ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+                ApiMessageType.ListenerType.ZK_BROKER);
+            ApiVersionCollection apiVersions = new ApiVersionCollection();
+            for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
+                if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
+                    // ApiVersion can NOT be reused in second ApiVersionCollection
+                    // due to the internal pointers it contains.
+                    apiVersions.add(apiVersion.duplicate());
+                }
+
+            }
+            ApiVersionsResponseData data = new ApiVersionsResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setThrottleTimeMs(0)
+                .setApiKeys(apiVersions);
+            return new ApiVersionsResponse(data);
+        };
+
         SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
                 securityProtocol, listenerName, false, saslMechanism, true,
-                credentialCache, null, null, time, new LogContext()) {
-
+                credentialCache, null, null, time, new LogContext(), apiVersionSupplier) {
             @Override
             protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs,
                                                                        Map<String, AuthenticateCallbackHandler> callbackHandlers,
@@ -1958,27 +1964,7 @@ public class SaslAuthenticatorTest {
                                                                        Map<String, Long> connectionsMaxReauthMsByMechanism,
                                                                        ChannelMetadataRegistry metadataRegistry) {
                 return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName,
-                    securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
-
-                    @Override
-                    protected ApiVersionsResponse apiVersionsResponse() {
-                        ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
-                        ApiVersionCollection apiVersions = new ApiVersionCollection();
-                        for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
-                            if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
-                                // ApiVersion can NOT be reused in second ApiVersionCollection
-                                // due to the internal pointers it contains.
-                                apiVersions.add(apiVersion.duplicate());
-                            }
-
-                        }
-                        ApiVersionsResponseData data = new ApiVersionsResponseData()
-                            .setErrorCode(Errors.NONE.code())
-                            .setThrottleTimeMs(0)
-                            .setApiKeys(apiVersions);
-                        return new ApiVersionsResponse(data);
-                    }
-
+                    securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier) {
                     @Override
                     protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
                         // Don't enable Kafka SASL_AUTHENTICATE headers
@@ -2003,7 +1989,7 @@ public class SaslAuthenticatorTest {
 
         SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
                 securityProtocol, listenerName, false, saslMechanism, true,
-                null, null, null, time, new LogContext()) {
+                null, null, null, time, new LogContext(), null) {
 
             @Override
             protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs,
@@ -2545,7 +2531,8 @@ public class SaslAuthenticatorTest {
                 String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache,
                 DelegationTokenCache tokenCache, Time time) {
             super(mode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism,
-                    handshakeRequestEnable, credentialCache, tokenCache, null, time, new LogContext());
+                handshakeRequestEnable, credentialCache, tokenCache, null, time, new LogContext(),
+                () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index e5c46c9..af0fedd 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator;
 import java.net.InetAddress;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
@@ -27,6 +28,7 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -152,15 +154,17 @@ public class SaslServerAuthenticatorTest {
     }
 
     private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer,
-                                                       String mechanism, ChannelMetadataRegistry metadataRegistry) throws IOException {
+                                                       String mechanism, ChannelMetadataRegistry metadataRegistry) {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
         Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
         Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
                 mechanism, new SaslServerCallbackHandler());
+        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+            ApiMessageType.ListenerType.ZK_BROKER);
         return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
                 new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(),
-                metadataRegistry, Time.SYSTEM);
+                metadataRegistry, Time.SYSTEM, () -> apiVersionsResponse);
     }
 
 }
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index e89f9fb..8793737 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -21,10 +21,9 @@ import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.config.ConfigDef.Validator
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
-import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
-import org.apache.kafka.common.requests.ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.record.RecordVersion
+import org.apache.kafka.common.requests.ApiVersionsResponse
 
 /**
  * This class contains the different Kafka versions.
@@ -147,52 +146,46 @@ object ApiVersion {
     }
   }
 
-  def apiVersionsResponse(throttleTimeMs: Int,
-                          maxMagic: Byte,
-                          latestSupportedFeatures: Features[SupportedVersionRange],
-                          controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
+  def apiVersionsResponse(
+    throttleTimeMs: Int,
+    minRecordVersion: RecordVersion,
+    latestSupportedFeatures: Features[SupportedVersionRange],
+    controllerApiVersions: Option[NodeApiVersions],
+    listenerType: ListenerType
+  ): ApiVersionsResponse = {
     apiVersionsResponse(
       throttleTimeMs,
-      maxMagic,
+      minRecordVersion,
       latestSupportedFeatures,
       Features.emptyFinalizedFeatures,
       ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
-      controllerApiVersions
+      controllerApiVersions,
+      listenerType
     )
   }
 
-  def apiVersionsResponse(throttleTimeMs: Int,
-                          maxMagic: Byte,
-                          latestSupportedFeatures: Features[SupportedVersionRange],
-                          finalizedFeatures: Features[FinalizedVersionRange],
-                          finalizedFeaturesEpoch: Long,
-                          controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
+  def apiVersionsResponse(
+    throttleTimeMs: Int,
+    minRecordVersion: RecordVersion,
+    latestSupportedFeatures: Features[SupportedVersionRange],
+    finalizedFeatures: Features[FinalizedVersionRange],
+    finalizedFeaturesEpoch: Long,
+    controllerApiVersions: Option[NodeApiVersions],
+    listenerType: ListenerType
+  ): ApiVersionsResponse = {
     val apiKeys = controllerApiVersions match {
-      case None => ApiVersionsResponse.defaultApiKeys(maxMagic)
-      case Some(controllerApiVersion) => ApiVersionsResponse.intersectControllerApiVersions(
-        maxMagic, controllerApiVersion.allSupportedApiVersions())
+      case None => ApiVersionsResponse.filterApis(minRecordVersion, listenerType)
+      case Some(controllerApiVersion) => ApiVersionsResponse.intersectForwardableApis(
+        listenerType, minRecordVersion, controllerApiVersion.allSupportedApiVersions())
     }
 
-    if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
-      throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) {
-      new ApiVersionsResponse(
-        ApiVersionsResponse.createApiVersionsResponseData(
-          DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
-          Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
-          apiKeys,
-          latestSupportedFeatures,
-          finalizedFeatures,
-          finalizedFeaturesEpoch))
-    } else {
-      new ApiVersionsResponse(
-        ApiVersionsResponse.createApiVersionsResponseData(
-          throttleTimeMs,
-          Errors.NONE,
-          apiKeys,
-          latestSupportedFeatures,
-          finalizedFeatures,
-          finalizedFeaturesEpoch))
-    }
+    ApiVersionsResponse.createApiVersionsResponse(
+      throttleTimeMs,
+      apiKeys,
+      latestSupportedFeatures,
+      finalizedFeatures,
+      finalizedFeaturesEpoch
+    )
   }
 }
 
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7d31125..48f723f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -30,6 +30,7 @@ import kafka.utils.{Logging, NotNothing, Pool}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
 import org.apache.kafka.common.network.Send
@@ -59,16 +60,19 @@ object RequestChannel extends Logging {
     val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
   }
 
-  class Metrics(allowControllerOnlyApis: Boolean = false) {
+  class Metrics(enabledApis: Iterable[ApiKeys]) {
+    def this(scope: ListenerType) = {
+      this(ApiKeys.apisForListener(scope).asScala)
+    }
 
     private val metricsMap = mutable.Map[String, RequestMetrics]()
 
-    (ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || allowControllerOnlyApis).map(_.name) ++
-        Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
+    (enabledApis.map(_.name) ++
+      Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
       metricsMap.put(name, new RequestMetrics(name))
     }
 
-    def apply(metricName: String) = metricsMap(metricName)
+    def apply(metricName: String): RequestMetrics = metricsMap(metricName)
 
     def close(): Unit = {
        metricsMap.values.foreach(_.removeMetrics())
@@ -296,8 +300,6 @@ object RequestChannel extends Logging {
     def responseLog: Option[JsonNode] = None
 
     def onComplete: Option[Send => Unit] = None
-
-    override def toString: String
   }
 
   /** responseLogValue should only be defined if request logging is enabled */
@@ -337,9 +339,8 @@ object RequestChannel extends Logging {
 class RequestChannel(val queueSize: Int,
                      val metricNamePrefix: String,
                      time: Time,
-                     allowControllerOnlyApis: Boolean = false) extends KafkaMetricsGroup {
+                     val metrics: RequestChannel.Metrics) extends KafkaMetricsGroup {
   import RequestChannel._
-  val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 72c5141..24df39f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,7 +33,7 @@ import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.network.SocketServer._
 import kafka.security.CredentialProvider
-import kafka.server.{BrokerReconfigurable, KafkaConfig}
+import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
 import kafka.utils.Implicits._
 import kafka.utils._
 import org.apache.kafka.common.config.ConfigException
@@ -78,15 +78,14 @@ class SocketServer(val config: KafkaConfig,
                    val metrics: Metrics,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
-                   allowControllerOnlyApis: Boolean = false,
-                   controllerSocketServer: Boolean = false)
+                   val apiVersionManager: ApiVersionManager)
   extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
   private val nodeId = config.brokerId
 
-  private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ")
+  private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")
 
   this.logIdent = logContext.logPrefix
 
@@ -98,12 +97,12 @@ class SocketServer(val config: KafkaConfig,
   // data-plane
   private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowControllerOnlyApis)
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time, allowControllerOnlyApis))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
 
   private var nextProcessorId = 0
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@@ -438,8 +437,9 @@ class SocketServer(val config: KafkaConfig,
       credentialProvider,
       memoryPool,
       logContext,
-      isPrivilegedListener = isPrivilegedListener,
-      allowControllerOnlyApis = allowControllerOnlyApis
+      Processor.ConnectionQueueSize,
+      isPrivilegedListener,
+      apiVersionManager
     )
   }
 
@@ -772,7 +772,6 @@ private[kafka] object Processor {
   val IdlePercentMetricName = "IdlePercent"
   val NetworkProcessorMetricTag = "networkProcessor"
   val ListenerMetricTag = "listener"
-
   val ConnectionQueueSize = 20
 }
 
@@ -800,9 +799,9 @@ private[kafka] class Processor(val id: Int,
                                credentialProvider: CredentialProvider,
                                memoryPool: MemoryPool,
                                logContext: LogContext,
-                               connectionQueueSize: Int = ConnectionQueueSize,
-                               isPrivilegedListener: Boolean = false,
-                               allowControllerOnlyApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               connectionQueueSize: Int,
+                               isPrivilegedListener: Boolean,
+                               apiVersionManager: ApiVersionManager) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -842,14 +841,19 @@ private[kafka] class Processor(val id: Int,
   metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
 
   private val selector = createSelector(
-    ChannelBuilders.serverChannelBuilder(listenerName,
+    ChannelBuilders.serverChannelBuilder(
+      listenerName,
       listenerName == config.interBrokerListenerName,
       securityProtocol,
       config,
       credentialProvider.credentialCache,
       credentialProvider.tokenCache,
       time,
-      logContext))
+      logContext,
+      () => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
+    )
+  )
+
   // Visible to override for testing
   protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
     channelBuilder match {
@@ -993,10 +997,10 @@ private[kafka] class Processor(val id: Int,
 
   protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
     val header = RequestHeader.parse(buffer)
-    if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
+    if (apiVersionManager.isApiEnabled(header.apiKey)) {
       header
     } else {
-      throw new InvalidRequestException("Received request for KIP-500 controller-only api key " + header.apiKey)
+      throw new InvalidRequestException(s"Received request api key ${header.apiKey} which is not enabled")
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
new file mode 100644
index 0000000..ebf3e74
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],
+    features: BrokerFeatures,
+    featureCache: FinalizedFeatureCache
+  ): ApiVersionManager = {
+    new DefaultApiVersionManager(
+      listenerType,
+      config.interBrokerProtocolVersion,
+      forwardingManager,
+      features,
+      featureCache
+    )
+  }
+}
+
+class SimpleApiVersionManager(
+  val listenerType: ListenerType,
+  val enabledApis: collection.Set[ApiKeys]
+) extends ApiVersionManager {
+
+  def this(listenerType: ListenerType) = {
+    this(listenerType, ApiKeys.apisForListener(listenerType).asScala)
+  }
+
+  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
+
+  override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
+    ApiVersionsResponse.createApiVersionsResponse(0, apiVersions)
+  }
+}
+
+class DefaultApiVersionManager(
+  val listenerType: ListenerType,
+  interBrokerProtocolVersion: ApiVersion,
+  forwardingManager: Option[ForwardingManager],
+  features: BrokerFeatures,
+  featureCache: FinalizedFeatureCache
+) extends ApiVersionManager {
+
+  override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
+    val supportedFeatures = features.supportedFeatures
+    val finalizedFeaturesOpt = featureCache.get
+    val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
+
+    val response = finalizedFeaturesOpt match {
+      case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
+        throttleTimeMs,
+        interBrokerProtocolVersion.recordVersion,
+        supportedFeatures,
+        finalizedFeatures.features,
+        finalizedFeatures.epoch,
+        controllerApiVersions,
+        listenerType)
+      case None => ApiVersion.apiVersionsResponse(
+        throttleTimeMs,
+        interBrokerProtocolVersion.recordVersion,
+        supportedFeatures,
+        controllerApiVersions,
+        listenerType)
+    }
+
+    // This is a temporary workaround in order to allow testing of forwarding
+    // in integration tests. We can remove this after the KIP-500 controller
+    // is available for integration testing.
+    if (forwardingManager.isDefined) {
+      response.data.apiKeys.add(
+        new ApiVersionsResponseData.ApiVersion()
+          .setApiKey(ApiKeys.ENVELOPE.id)
+          .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+          .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+      )
+    }
+
+    response
+  }
+
+  override def enabledApis: collection.Set[ApiKeys] = {
+    forwardingManager match {
+      case Some(_) => ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE)
+      case None => ApiKeys.apisForListener(listenerType).asScala
+    }
+  }
+
+  override def isApiEnabled(apiKey: ApiKeys): Boolean = {
+    apiKey.inScope(listenerType) || (apiKey == ApiKeys.ENVELOPE && forwardingManager.isDefined)
+  }
+}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 57ceb46..19d65ab 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -33,6 +33,7 @@ import kafka.server.KafkaBroker.metricsPrefix
 import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
 import kafka.utils.{CoreUtils, KafkaScheduler}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -167,7 +168,7 @@ class BrokerServer(
       // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
       // until we catch up on the metadata log and have up-to-date topic and broker configs.
       logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time,
-        brokerTopicStats, logDirFailureChannel, true)
+        brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
 
       metadataCache = MetadataCache.raftMetadataCache(config.nodeId)
       // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
@@ -175,17 +176,44 @@ class BrokerServer(
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
       credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+      val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
+      val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
+
+      val forwardingChannelManager = BrokerToControllerChannelManager(
+        controllerNodeProvider,
+        time,
+        metrics,
+        config,
+        channelName = "forwarding",
+        threadNamePrefix,
+        retryTimeoutMs = 60000
+      )
+      forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
+      forwardingManager.start()
+
+      val apiVersionManager = ApiVersionManager(
+        ListenerType.BROKER,
+        config,
+        Some(forwardingManager),
+        brokerFeatures,
+        featureCache
+      )
+
       // Create and start the socket server acceptor threads so that the bound port is known.
       // Delay starting processors until the end of the initialization sequence to ensure
       // that credentials have been loaded before processing authentications.
-      socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false)
+      socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
       socketServer.startup(startProcessingRequests = false)
 
-      val controllerNodes =
-        RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
-      val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
-      val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
-        time, metrics, config, "alterisr", threadNamePrefix, 60000)
+      val alterIsrChannelManager = BrokerToControllerChannelManager(
+        controllerNodeProvider,
+        time,
+        metrics,
+        config,
+        channelName = "alterisr",
+        threadNamePrefix,
+        retryTimeoutMs = Long.MaxValue
+      )
       alterIsrManager = new DefaultAlterIsrManager(
         controllerChannelManager = alterIsrChannelManager,
         scheduler = kafkaScheduler,
@@ -200,11 +228,6 @@ class BrokerServer(
         brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
         configRepository, threadNamePrefix)
 
-      val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
-        time, metrics, config, "forwarding", threadNamePrefix, 60000)
-      forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
-      forwardingManager.start()
-
       /* start token manager */
       if (config.tokenAuthEnabled) {
         throw new UnsupportedOperationException("Delegation tokens are not supported")
@@ -306,7 +329,7 @@ class BrokerServer(
       dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
         replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
         config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
-        fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+        fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
 
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
         config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
@@ -315,7 +338,7 @@ class BrokerServer(
         controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport,
           replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
           config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
-          fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+          fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
 
         controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
           1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index efcebb4..625ce52 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -28,6 +28,7 @@ import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -124,15 +125,16 @@ class ControllerServer(
           }.toMap
       }
 
+      val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
+
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
       credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
       socketServer = new SocketServer(config,
         metrics,
         time,
         credentialProvider,
-        allowControllerOnlyApis = true,
-        controllerSocketServer = true)
-      socketServer.startup(false, None, config.controllerListeners)
+        apiVersionManager)
+      socketServer.startup(startProcessingRequests = false, controlPlaneListener = None, config.controllerListeners)
       socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
         config.controllerListeners.head.listenerName))
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0545c74..5dafdd6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -61,7 +61,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
 import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseD [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsRespo [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -110,8 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val clusterId: String,
                 time: Time,
                 val tokenManager: DelegationTokenManager,
-                val brokerFeatures: BrokerFeatures,
-                val finalizedFeatureCache: FinalizedFeatureCache) extends ApiRequestHandler with Logging {
+                val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
 
   metadataSupport.ensureConsistentWith(config)
 
@@ -158,6 +157,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
 
+      if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
+        // The socket server will reject APIs which are not exposed in this scope and close the connection
+        // before handing them to the request handler, so this path should not be exercised in practice
+        throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
+      }
+
       request.header.apiKey match {
         case ApiKeys.PRODUCE => handleProduceRequest(request)
         case ApiKeys.FETCH => handleFetchRequest(request)
@@ -217,17 +222,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
         case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
-
-        // Handle requests that should have been sent to the KIP-500 controller.
-        // Until we are ready to integrate the Raft layer, these APIs are treated as
-        // unexpected and we just close the connection.
-        case ApiKeys.VOTE => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.BEGIN_QUORUM_EPOCH => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.END_QUORUM_EPOCH => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.DESCRIBE_QUORUM => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.BROKER_REGISTRATION => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.BROKER_HEARTBEAT => requestHelper.closeConnection(request, util.Collections.emptyMap())
+        case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1686,39 +1681,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     // ApiVersionRequest is not available.
     def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val supportedFeatures = brokerFeatures.supportedFeatures
-        val finalizedFeaturesOpt = finalizedFeatureCache.get
-        val controllerApiVersions = metadataSupport.forwardingManager.flatMap(_.controllerApiVersions)
-
-        val apiVersionsResponse =
-          finalizedFeaturesOpt match {
-            case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-              requestThrottleMs,
-              config.interBrokerProtocolVersion.recordVersion.value,
-              supportedFeatures,
-              finalizedFeatures.features,
-              finalizedFeatures.epoch,
-              controllerApiVersions)
-            case None => ApiVersion.apiVersionsResponse(
-              requestThrottleMs,
-              config.interBrokerProtocolVersion.recordVersion.value,
-              supportedFeatures,
-              controllerApiVersions)
-          }
-        if (request.context.fromPrivilegedListener) {
-          apiVersionsResponse.data.apiKeys().add(
-            new ApiVersionsResponseData.ApiVersion()
-              .setApiKey(ApiKeys.ENVELOPE.id)
-              .setMinVersion(ApiKeys.ENVELOPE.oldestVersion())
-              .setMaxVersion(ApiKeys.ENVELOPE.latestVersion())
-          )
-        }
-        apiVersionsResponse
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)
       }
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index dc3fd16..7ec4622 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -74,8 +74,17 @@ class KafkaRaftServer(
   private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
 
   private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
-    Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
-      offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
+    Some(new BrokerServer(
+      config,
+      metaProps,
+      metaLogShim,
+      time,
+      metrics,
+      threadNamePrefix,
+      offlineDirs,
+      controllerQuorumVotersFuture,
+      Server.SUPPORTED_FEATURES
+    ))
   } else {
     None
   }
@@ -89,7 +98,7 @@ class KafkaRaftServer(
       time,
       metrics,
       threadNamePrefix,
-      CompletableFuture.completedFuture(config.quorumVoters)
+      controllerQuorumVotersFuture
     ))
   } else {
     None
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df37026..9951d58 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,6 +37,7 @@ import kafka.utils._
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
@@ -157,7 +158,6 @@ class KafkaServer(
   private var _featureChangeListener: FinalizedFeatureChangeListener = null
 
   val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
-
   val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
 
   def clusterId: String = _clusterId
@@ -256,14 +256,32 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        if (enableForwarding) {
+          this.forwardingManager = Some(ForwardingManager(
+            config,
+            metadataCache,
+            time,
+            metrics,
+            threadNamePrefix
+          ))
+          forwardingManager.foreach(_.start())
+        }
+
+        val apiVersionManager = ApiVersionManager(
+          ListenerType.ZK_BROKER,
+          config,
+          forwardingManager,
+          brokerFeatures,
+          featureCache
+        )
+
         // Create and start the socket server acceptor threads so that the bound port is known.
         // Delay starting processors until the end of the initialization sequence to ensure
         // that credentials have been loaded before processing authentications.
         //
         // Note that we allow the use of KIP-500 controller APIs when forwarding is enabled
         // so that the Envelope request is exposed. This is only used in testing currently.
-        socketServer = new SocketServer(config, metrics, time, credentialProvider,
-          allowControllerOnlyApis = enableForwarding)
+        socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
         socketServer.startup(startProcessingRequests = false)
 
         /* start replica manager */
@@ -300,18 +318,6 @@ class KafkaServer(
         kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
         kafkaController.startup()
 
-        /* start forwarding manager */
-        if (enableForwarding) {
-          this.forwardingManager = Some(ForwardingManager(
-            config,
-            metadataCache,
-            time,
-            metrics,
-            threadNamePrefix
-          ))
-          forwardingManager.foreach(_.start())
-        }
-
         adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
 
         /* start group coordinator */
@@ -363,7 +369,7 @@ class KafkaServer(
         val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
         dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
           autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
-          fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+          fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
 
         dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
           config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
@@ -371,7 +377,7 @@ class KafkaServer(
         socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
           controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
             autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
-            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+            fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
 
           controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
             1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index ddfa1fa..e4dec2e 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import kafka.network.RequestChannel
 import kafka.network.RequestConvertToJson
 import kafka.raft.RaftManager
-import kafka.server.ApiRequestHandler
+import kafka.server.{ApiRequestHandler, ApiVersionManager}
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData}
@@ -38,6 +38,7 @@ class TestRaftRequestHandler(
   raftManager: RaftManager[_],
   requestChannel: RequestChannel,
   time: Time,
+  apiVersionManager: ApiVersionManager
 ) extends ApiRequestHandler with Logging {
 
   override def handle(request: RequestChannel.Request): Unit = {
@@ -45,6 +46,7 @@ class TestRaftRequestHandler(
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
       request.header.apiKey match {
+        case ApiKeys.API_VERSIONS => handleApiVersions(request)
         case ApiKeys.VOTE => handleVote(request)
         case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
         case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
@@ -62,6 +64,10 @@ class TestRaftRequestHandler(
     }
   }
 
+  private def handleApiVersions(request: RequestChannel.Request): Unit = {
+    sendResponse(request, Some(apiVersionManager.apiVersionResponse(throttleTimeMs = 0)))
+  }
+
   private def handleVote(request: RequestChannel.Request): Unit = {
     handle(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
   }
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 2391ca4..ba6ab407 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -24,9 +24,10 @@ import joptsimple.OptionException
 import kafka.network.SocketServer
 import kafka.raft.{KafkaRaftManager, RaftManager}
 import kafka.security.CredentialProvider
-import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties}
+import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Logging, ShutdownableThread}
 import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
 import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
@@ -68,7 +69,8 @@ class TestRaftServer(
     tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
     credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
-    socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = true)
+    val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
+    socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
     socketServer.startup(startProcessingRequests = false)
 
     val metaProperties = MetaProperties(
@@ -96,7 +98,8 @@ class TestRaftServer(
     val requestHandler = new TestRaftRequestHandler(
       raftManager,
       socketServer.dataPlaneRequestChannel,
-      time
+      time,
+      apiVersionManager
     )
 
     dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 6224591..2db694f 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
     assertTrue(lineIter.hasNext)
     assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
     val nodeApiVersions = NodeApiVersions.create
-    val enabledApis = ApiKeys.brokerApis.asScala
+    val enabledApis = ApiKeys.zkBrokerApis.asScala
     for (apiKey <- enabledApis) {
       val apiVersion = nodeApiVersions.apiVersion(apiKey)
       assertNotNull(apiVersion)
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 46156f1..90454bb 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -29,7 +29,9 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.SaslAuthenticationException
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.network._
+import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
 import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
 import org.apache.kafka.common.security.kerberos.KerberosLogin
@@ -233,7 +235,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     val config = new TestSecurityConfig(clientConfig)
     val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
     val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
-      null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext()) {
+      null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext(),
+      () => ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) {
       override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
     }
     channelBuilder.configure(config.values())
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 1e86876..6a3eb31 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -20,6 +20,7 @@ package kafka.api
 import java.util
 
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
 import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
@@ -179,9 +180,10 @@ class ApiVersionTest {
   def shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue(): Unit = {
     val response = ApiVersion.apiVersionsResponse(
       10,
-      RecordBatch.MAGIC_VALUE_V1,
+      RecordVersion.V1,
       Features.emptySupportedFeatures,
-      None
+      None,
+      ListenerType.ZK_BROKER
     )
     verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
     assertEquals(10, response.throttleTimeMs)
@@ -194,13 +196,14 @@ class ApiVersionTest {
   def shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
     val response = ApiVersion.apiVersionsResponse(
       10,
-      RecordBatch.MAGIC_VALUE_V1,
+      RecordVersion.V1,
       Features.supportedFeatures(
         Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange(1.toShort, 4.toShort)))),
       Features.finalizedFeatures(
         Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange(2.toShort, 3.toShort)))),
       10,
-      None
+      None,
+      ListenerType.ZK_BROKER
     )
 
     verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
@@ -228,11 +231,12 @@ class ApiVersionTest {
   def shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
     val response = ApiVersion.apiVersionsResponse(
       AbstractResponse.DEFAULT_THROTTLE_TIME,
-      RecordBatch.CURRENT_MAGIC_VALUE,
+      RecordVersion.current(),
       Features.emptySupportedFeatures,
-      None
+      None,
+      ListenerType.ZK_BROKER
     )
-    assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis), apiKeysInResponse(response))
+    assertEquals(new util.HashSet[ApiKeys](ApiKeys.zkBrokerApis), apiKeysInResponse(response))
     assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs)
     assertTrue(response.data.supportedFeatures.isEmpty)
     assertTrue(response.data.finalizedFeatures.isEmpty)
@@ -243,9 +247,10 @@ class ApiVersionTest {
   def testMetadataQuorumApisAreDisabled(): Unit = {
     val response = ApiVersion.apiVersionsResponse(
       AbstractResponse.DEFAULT_THROTTLE_TIME,
-      RecordBatch.CURRENT_MAGIC_VALUE,
+      RecordVersion.current(),
       Features.emptySupportedFeatures,
-      None
+      None,
+      ListenerType.ZK_BROKER
     )
 
     // Ensure that APIs needed for the internal metadata quorum (KIP-500)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index dd93e43..2936144 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -31,10 +31,11 @@ import com.yammer.metrics.core.{Gauge, Meter}
 import javax.net.ssl._
 import kafka.metrics.KafkaYammerMetrics
 import kafka.security.CredentialProvider
-import kafka.server.{KafkaConfig, ThrottledChannel}
+import kafka.server.{KafkaConfig, SimpleApiVersionManager, ThrottledChannel}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.{ProduceRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState
@@ -73,7 +74,8 @@ class SocketServerTest {
   // Clean-up any metrics left around by previous tests
   TestUtils.clearYammerMetrics()
 
-  val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider)
+  private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER)
+  val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
   server.startup()
   val sockets = new ArrayBuffer[Socket]
 
@@ -452,7 +454,8 @@ class SocketServerTest {
     val time = new MockTime()
     props.put(KafkaConfig.ConnectionsMaxIdleMsProp, idleTimeMs.toString)
     val serverMetrics = new Metrics
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics,
+      time, credentialProvider, apiVersionManager)
 
     try {
       overrideServer.startup()
@@ -504,12 +507,14 @@ class SocketServerTest {
     val serverMetrics = new Metrics
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
+    val overrideServer = new SocketServer(
+      KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider, apiVersionManager
+    ) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, memoryPool, new LogContext(), Processor.ConnectionQueueSize, isPrivilegedListener, apiVersionManager) {
           override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
           override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
             val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
@@ -799,7 +804,8 @@ class SocketServerTest {
     val newProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     newProps.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
     newProps.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "%s:%s".format("127.0.0.1", "5"))
-    val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(), Time.SYSTEM, credentialProvider)
+    val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(),
+      Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
       server.startup()
       // make the maximum allowable number of connections
@@ -837,7 +843,8 @@ class SocketServerTest {
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
     val serverMetrics = new Metrics()
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
       overrideServer.startup()
       // make the maximum allowable number of connections
@@ -866,7 +873,8 @@ class SocketServerTest {
     overrideProps.put(KafkaConfig.NumQuotaSamplesProp, String.valueOf(2))
     val connectionRate = 5
     val time = new MockTime()
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(), time, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(),
+      time, credentialProvider, apiVersionManager)
     // update the connection rate to 5
     overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, Some(connectionRate))
     try {
@@ -916,7 +924,8 @@ class SocketServerTest {
     overrideProps.put(KafkaConfig.NumQuotaSamplesProp, String.valueOf(2))
     val connectionRate = 5
     val time = new MockTime()
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(), time, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(),
+      time, credentialProvider, apiVersionManager)
     overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, Some(connectionRate))
     overrideServer.startup()
     // make the maximum allowable number of connections
@@ -938,7 +947,8 @@ class SocketServerTest {
   @Test
   def testSslSocketServer(): Unit = {
     val serverMetrics = new Metrics
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(sslServerProps), serverMetrics, Time.SYSTEM, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(sslServerProps), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
       overrideServer.startup()
       val sslContext = SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS)
@@ -1078,12 +1088,14 @@ class SocketServerTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
+    val overrideServer = new SocketServer(
+      KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider, apiVersionManager
+    ) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, MemoryPool.NONE, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, MemoryPool.NONE, new LogContext(), Processor.ConnectionQueueSize, isPrivilegedListener, apiVersionManager) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
             conn.close()
             super.sendResponse(response, responseSend)
@@ -1120,12 +1132,14 @@ class SocketServerTest {
   def testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend(): Unit = {
     val serverMetrics = new Metrics
     @volatile var selector: TestableSelector = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
+    val overrideServer = new SocketServer(
+      KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider, apiVersionManager
+    ) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, memoryPool, new LogContext(), Processor.ConnectionQueueSize, isPrivilegedListener, apiVersionManager) {
           override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
             val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
             selector = testableSelector
@@ -1161,7 +1175,8 @@ class SocketServerTest {
     props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "110")
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
       overrideServer.startup()
       conn = connect(overrideServer)
@@ -1873,9 +1888,13 @@ class SocketServerTest {
     }
   }
 
-  class TestableSocketServer(config : KafkaConfig = KafkaConfig.fromProps(props), val connectionQueueSize: Int = 20,
-                             override val time: Time = Time.SYSTEM) extends SocketServer(config,
-    new Metrics, time, credentialProvider) {
+  class TestableSocketServer(
+    config : KafkaConfig = KafkaConfig.fromProps(props),
+    connectionQueueSize: Int = 20,
+    time: Time = Time.SYSTEM
+  ) extends SocketServer(
+    config, new Metrics, time, credentialProvider, apiVersionManager,
+  ) {
 
     @volatile var selector: Option[TestableSelector] = None
     @volatile var uncaughtExceptions = 0
@@ -1884,7 +1903,7 @@ class SocketServerTest {
                               protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
         config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
-        memoryPool, new LogContext(), connectionQueueSize, isPrivilegedListener) {
+        memoryPool, new LogContext(), connectionQueueSize, isPrivilegedListener, apiVersionManager) {
 
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
           val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index f7163ca..9c7b444 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -16,15 +16,17 @@
  */
 package kafka.server
 
+import java.util.Properties
+
 import integration.kafka.server.IntegrationTestUtils
 import kafka.test.ClusterInstance
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.junit.jupiter.api.Assertions._
 
-import java.util.Properties
 import scala.jdk.CollectionConverters._
 
 abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
@@ -53,14 +55,13 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
     } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName): Unit = {
-    val expectedApis = ApiKeys.brokerApis()
-    if (listenerName == controlPlaneListenerName) {
-      expectedApis.add(ApiKeys.ENVELOPE)
-    }
+  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse): Unit = {
+    val expectedApis = ApiKeys.zkBrokerApis()
     assertEquals(expectedApis.size(), apiVersionsResponse.data.apiKeys().size(),
       "API keys in ApiVersionsResponse must match API keys supported by broker.")
-    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) {
+
+    val defaultApiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER)
+    for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys().asScala) {
       val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
       assertNotNull(actualApiVersion, s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.")
       assertEquals(expectedApiVersion.apiKey, actualApiVersion.apiKey, "API key must be supported by the broker.")
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
new file mode 100644
index 0000000..a93cc90
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+import org.mockito.Mockito
+
+import scala.jdk.CollectionConverters._
+
+class ApiVersionManagerTest {
+  private val brokerFeatures = BrokerFeatures.createDefault()
+  private val featureCache = new FinalizedFeatureCache(brokerFeatures)
+
+  @ParameterizedTest
+  @EnumSource(classOf[ListenerType])
+  def testApiScope(apiScope: ListenerType): Unit = {
+    val versionManager = new DefaultApiVersionManager(
+      listenerType = apiScope,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      forwardingManager = None,
+      features = brokerFeatures,
+      featureCache = featureCache
+    )
+    assertEquals(ApiKeys.apisForListener(apiScope).asScala, versionManager.enabledApis)
+    assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall(versionManager.isApiEnabled))
+  }
+
+  @Test
+  def testControllerApiIntersection(): Unit = {
+    val controllerMinVersion: Short = 1
+    val controllerMaxVersion: Short = 5
+
+    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+
+    Mockito.when(forwardingManager.controllerApiVersions).thenReturn(Some(NodeApiVersions.create(
+      ApiKeys.CREATE_TOPICS.id,
+      controllerMinVersion,
+      controllerMaxVersion
+    )))
+
+    val versionManager = new DefaultApiVersionManager(
+      listenerType = ListenerType.ZK_BROKER,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      forwardingManager = Some(forwardingManager),
+      features = brokerFeatures,
+      featureCache = featureCache
+    )
+
+    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
+    val alterConfigVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.CREATE_TOPICS.id)
+    assertNotNull(alterConfigVersion)
+    assertEquals(controllerMinVersion, alterConfigVersion.minVersion)
+    assertEquals(controllerMaxVersion, alterConfigVersion.maxVersion)
+  }
+
+  @Test
+  def testEnvelopeEnabledWhenForwardingManagerPresent(): Unit = {
+    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+    Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
+
+    val versionManager = new DefaultApiVersionManager(
+      listenerType = ListenerType.ZK_BROKER,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      forwardingManager = Some(forwardingManager),
+      features = brokerFeatures,
+      featureCache = featureCache
+    )
+    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+    assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+
+    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
+    val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
+    assertNotNull(envelopeVersion)
+    assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion)
+    assertEquals(ApiKeys.ENVELOPE.latestVersion, envelopeVersion.maxVersion)
+  }
+
+  @Test
+  def testEnvelopeDisabledWhenForwardingManagerEmpty(): Unit = {
+    val versionManager = new DefaultApiVersionManager(
+      listenerType = ListenerType.ZK_BROKER,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      forwardingManager = None,
+      features = brokerFeatures,
+      featureCache = featureCache
+    )
+    assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+    assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+
+    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
+    assertNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index dc35bae..34ee74a0 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -40,14 +40,14 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
   def testApiVersionsRequest(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
-    validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
+    validateApiVersionsResponse(apiVersionsResponse)
   }
 
   @ClusterTest
   def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName)
-    validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
+    validateApiVersionsResponse(apiVersionsResponse)
   }
 
   @ClusterTest
@@ -66,14 +66,14 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
   def testApiVersionsRequestValidationV0(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
-    validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
+    validateApiVersionsResponse(apiVersionsResponse)
   }
 
   @ClusterTest
   def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName)
-    validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
+    validateApiVersionsResponse(apiVersionsResponse)
   }
 
   @ClusterTest
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index bb4f57b..9240af6 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.MockClient.RequestMatcher
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -195,7 +196,7 @@ class ForwardingManagerTest {
       startTimeNanos = time.nanoseconds(),
       memoryPool = MemoryPool.NONE,
       buffer = requestBuffer,
-      metrics = new RequestChannel.Metrics(allowControllerOnlyApis = true),
+      metrics = new RequestChannel.Metrics(ListenerType.CONTROLLER),
       envelope = None
     )
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5138bf6..e80c6eb 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -37,7 +37,6 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, RaftMetadataCache}
 import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.acl.AclOperation
@@ -45,6 +44,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
 import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
@@ -129,8 +129,6 @@ class KafkaApisTest {
                       raftSupport: Boolean = false,
                       overrideProperties: Map[String, String] = Map.empty): KafkaApis = {
 
-    val brokerFeatures = BrokerFeatures.createDefault()
-    val cache = new FinalizedFeatureCache(brokerFeatures)
     val properties = if (raftSupport) {
       val properties = TestUtils.createBrokerConfig(brokerId, "")
       properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
@@ -163,6 +161,15 @@ class KafkaApisTest {
         case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
       }
     }
+
+    val listenerType = if (raftSupport) ListenerType.BROKER else ListenerType.ZK_BROKER
+    val enabledApis = if (enableForwarding) {
+      ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE)
+    } else {
+      ApiKeys.apisForListener(listenerType).asScala.toSet
+    }
+    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis)
+
     new KafkaApis(requestChannel,
       metadataSupport,
       replicaManager,
@@ -181,8 +188,7 @@ class KafkaApisTest {
       clusterId,
       time,
       null,
-      brokerFeatures,
-      cache)
+      apiVersionManager)
   }
 
   @Test
@@ -653,75 +659,6 @@ class KafkaApisTest {
   }
 
   @Test
-  def testHandleApiVersionsWithControllerApiVersions(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
-
-    val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
-
-    val permittedVersion: Short = 0
-    EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(
-      Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, permittedVersion)))
-
-    val capturedResponse = expectNoThrottling()
-
-    val apiVersionsRequest = new ApiVersionsRequest.Builder()
-      .build(requestHeader.apiVersion)
-    val request = buildRequest(apiVersionsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager,
-      requestChannel, authorizer, adminManager, controller)
-
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request)
-
-    val expectedVersions = new ApiVersionsResponseData.ApiVersion()
-      .setApiKey(ApiKeys.ALTER_CONFIGS.id)
-      .setMaxVersion(permittedVersion)
-      .setMinVersion(permittedVersion)
-
-    val response = readResponse(apiVersionsRequest, capturedResponse)
-      .asInstanceOf[ApiVersionsResponse]
-    assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode()))
-
-    val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id)
-    assertEquals(expectedVersions, alterConfigVersions)
-
-    verify(authorizer, adminManager, forwardingManager)
-  }
-
-  @Test
-  def testGetUnsupportedVersionsWhenControllerApiVersionsNotAvailable(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
-
-    val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
-
-    EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(None)
-
-    val capturedResponse = expectNoThrottling()
-
-    val apiVersionsRequest = new ApiVersionsRequest.Builder()
-      .build(requestHeader.apiVersion)
-    val request = buildRequest(apiVersionsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager,
-      requestChannel, authorizer, adminManager, controller)
-
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request)
-
-    val response = readResponse(apiVersionsRequest, capturedResponse)
-      .asInstanceOf[ApiVersionsResponse]
-    assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode()))
-
-    val expectedVersions = ApiVersionsResponse.toApiVersion(ApiKeys.ALTER_CONFIGS)
-
-    val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id)
-    assertEquals(expectedVersions, alterConfigVersions)
-
-    verify(authorizer, adminManager, forwardingManager)
-  }
-
-  @Test
   def testCreateTopicsWithAuthorizer(): Unit = {
     val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5b1363f..37fda0f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -171,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
   def testUnauthorizedThrottle(): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.brokerApis.asScala) {
+    for (apiKey <- ApiKeys.zkBrokerApis.asScala) {
       submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
     }
 
@@ -754,9 +754,9 @@ class RequestQuotaTest extends BaseRequestTest {
 }
 
 object RequestQuotaTest {
-  val ClusterActions = ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
+  val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
   val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
-  val ClientActions = ApiKeys.brokerApis.asScala.toSet -- ClusterActions -- SaslActions
+  val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions
 
   val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
   // Principal used for all client connections. This is modified by tests which
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index bbc71ca..05c83d4 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -59,7 +59,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
     try {
       val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
+      validateApiVersionsResponse(apiVersionsResponse)
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
@@ -88,7 +88,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
       assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
       val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse2, cluster.clientListener())
+      validateApiVersionsResponse(apiVersionsResponse2)
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
diff --git a/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java b/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
index 075ee48..408e1a7 100644
--- a/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
@@ -19,14 +19,23 @@ package org.apache.kafka.message;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 public final class ApiMessageTypeGenerator implements TypeClassGenerator {
     private final HeaderGenerator headerGenerator;
     private final CodeBuffer buffer;
     private final TreeMap<Short, ApiData> apis;
+    private final EnumMap<RequestListenerType, List<ApiData>> apisByListener = new EnumMap<>(RequestListenerType.class);
 
     private static final class ApiData {
         short apiKey;
@@ -93,6 +102,13 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
                         "API key " + spec.apiKey().get());
                 }
                 data.requestSpec = spec;
+
+                if (spec.listeners() != null) {
+                    for (RequestListenerType listener : spec.listeners()) {
+                        apisByListener.putIfAbsent(listener, new ArrayList<>());
+                        apisByListener.get(listener).add(data);
+                    }
+                }
                 break;
             }
             case RESPONSE: {
@@ -140,6 +156,8 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("%n");
         generateAccessor("highestSupportedVersion", "short");
         buffer.printf("%n");
+        generateAccessor("listeners", "EnumSet<ListenerType>");
+        buffer.printf("%n");
         generateAccessor("apiKey", "short");
         buffer.printf("%n");
         generateAccessor("requestSchemas", "Schema[]");
@@ -151,18 +169,48 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         generateHeaderVersion("request");
         buffer.printf("%n");
         generateHeaderVersion("response");
+        buffer.printf("%n");
+        generateListenerTypesEnum();
+        buffer.printf("%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
         headerGenerator.generate();
     }
 
+    private String generateListenerTypeEnumSet(Collection<String> values) {
+        if (values.isEmpty()) {
+            return "EnumSet.noneOf(ListenerType.class)";
+        }
+        StringBuilder bldr = new StringBuilder("EnumSet.of(");
+        Iterator<String> iter = values.iterator();
+        while (iter.hasNext()) {
+            bldr.append("ListenerType.");
+            bldr.append(iter.next());
+            if (iter.hasNext()) {
+                bldr.append(", ");
+            }
+        }
+        bldr.append(")");
+        return bldr.toString();
+    }
+
     private void generateEnumValues() {
         int numProcessed = 0;
         for (Map.Entry<Short, ApiData> entry : apis.entrySet()) {
             ApiData apiData = entry.getValue();
             String name = apiData.name();
             numProcessed++;
-            buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d)%s%n",
+
+            final Collection<String> listeners;
+            if (apiData.requestSpec.listeners() == null) {
+                listeners = Collections.emptyList();
+            } else {
+                listeners = apiData.requestSpec.listeners().stream()
+                    .map(RequestListenerType::name)
+                    .collect(Collectors.toList());
+            }
+
+            buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s)%s%n",
                 MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT),
                 MessageGenerator.capitalizeFirst(name),
                 entry.getKey(),
@@ -170,6 +218,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
                 apiData.responseSchema(),
                 apiData.requestSpec.struct().versions().lowest(),
                 apiData.requestSpec.struct().versions().highest(),
+                generateListenerTypeEnumSet(listeners),
                 (numProcessed == apis.size()) ? ";" : ",");
         }
     }
@@ -181,13 +230,16 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("private final Schema[] responseSchemas;%n");
         buffer.printf("private final short lowestSupportedVersion;%n");
         buffer.printf("private final short highestSupportedVersion;%n");
+        buffer.printf("private final EnumSet<ListenerType> listeners;%n");
         headerGenerator.addImport(MessageGenerator.SCHEMA_CLASS);
+        headerGenerator.addImport(MessageGenerator.ENUM_SET_CLASS);
     }
 
     private void generateEnumConstructor() {
         buffer.printf("ApiMessageType(String name, short apiKey, " +
             "Schema[] requestSchemas, Schema[] responseSchemas, " +
-            "short lowestSupportedVersion, short highestSupportedVersion) {%n");
+            "short lowestSupportedVersion, short highestSupportedVersion, " +
+            "EnumSet<ListenerType> listeners) {%n");
         buffer.incrementIndent();
         buffer.printf("this.name = name;%n");
         buffer.printf("this.apiKey = apiKey;%n");
@@ -195,6 +247,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("this.responseSchemas = responseSchemas;%n");
         buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n");
         buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n");
+        buffer.printf("this.listeners = listeners;%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
     }
@@ -338,6 +391,18 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("}%n");
     }
 
+    private void generateListenerTypesEnum() {
+        buffer.printf("public enum ListenerType {%n");
+        buffer.incrementIndent();
+        Iterator<RequestListenerType> listenerIter = Arrays.stream(RequestListenerType.values()).iterator();
+        while (listenerIter.hasNext()) {
+            RequestListenerType scope = listenerIter.next();
+            buffer.printf("%s%s%n", scope.name(), listenerIter.hasNext() ? "," : ";");
+        }
+        buffer.decrementIndent();
+        buffer.printf("}%n");
+    }
+
     private void write(BufferedWriter writer) throws IOException {
         headerGenerator.buffer().write(writer);
         buffer.write(writer);
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index b6fb0aa..a1b9728 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -51,6 +51,8 @@ public final class MessageGenerator {
 
     static final String API_MESSAGE_TYPE_JAVA = "ApiMessageType.java";
 
+    static final String API_SCOPE_JAVA = "ApiScope.java";
+
     static final String METADATA_RECORD_TYPE_JAVA = "MetadataRecordType.java";
 
     static final String METADATA_JSON_CONVERTERS_JAVA = "MetadataJsonConverters.java";
@@ -84,6 +86,8 @@ public final class MessageGenerator {
 
     static final String ITERATOR_CLASS = "java.util.Iterator";
 
+    static final String ENUM_SET_CLASS = "java.util.EnumSet";
+
     static final String TYPE_CLASS = "org.apache.kafka.common.protocol.types.Type";
 
     static final String FIELD_CLASS = "org.apache.kafka.common.protocol.types.Field";
@@ -94,8 +98,6 @@ public final class MessageGenerator {
 
     static final String COMPACT_ARRAYOF_CLASS = "org.apache.kafka.common.protocol.types.CompactArrayOf";
 
-    static final String STRUCT_CLASS = "org.apache.kafka.common.protocol.types.Struct";
-
     static final String BYTES_CLASS = "org.apache.kafka.common.utils.Bytes";
 
     static final String UUID_CLASS = "org.apache.kafka.common.Uuid";
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
index 0b53cb1..fdcd7cd 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
@@ -37,6 +37,8 @@ public final class MessageSpec {
 
     private final Versions flexibleVersions;
 
+    private final List<RequestListenerType> listeners;
+
     @JsonCreator
     public MessageSpec(@JsonProperty("name") String name,
                        @JsonProperty("validVersions") String validVersions,
@@ -44,7 +46,8 @@ public final class MessageSpec {
                        @JsonProperty("apiKey") Short apiKey,
                        @JsonProperty("type") MessageSpecType type,
                        @JsonProperty("commonStructs") List<StructSpec> commonStructs,
-                       @JsonProperty("flexibleVersions") String flexibleVersions) {
+                       @JsonProperty("flexibleVersions") String flexibleVersions,
+                       @JsonProperty("listeners") List<RequestListenerType> listeners) {
         this.struct = new StructSpec(name, validVersions, fields);
         this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
         this.type = Objects.requireNonNull(type);
@@ -57,6 +60,12 @@ public final class MessageSpec {
                 this.flexibleVersions + ", which is not open-ended.  flexibleVersions must " +
                 "be either none, or an open-ended range (that ends with a plus sign).");
         }
+
+        if (listeners != null && !listeners.isEmpty() && type != MessageSpecType.REQUEST) {
+            throw new RuntimeException("The `requestScope` property is only valid for " +
+                "messages with type `request`");
+        }
+        this.listeners = listeners;
     }
 
     public StructSpec struct() {
@@ -106,6 +115,11 @@ public final class MessageSpec {
         return flexibleVersions.toString();
     }
 
+    @JsonProperty("listeners")
+    public List<RequestListenerType> listeners() {
+        return listeners;
+    }
+
     public String dataClassName() {
         switch (type) {
             case HEADER:
diff --git a/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
new file mode 100644
index 0000000..cefd40d
--- /dev/null
+++ b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestListenerType {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER;
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index c71c058..e59a6a1 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -23,13 +23,11 @@ import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.network.RequestConvertToJson;
 import kafka.server.AutoTopicCreationManager;
-import kafka.server.BrokerFeatures;
 import kafka.server.BrokerTopicStats;
 import kafka.server.ClientQuotaManager;
 import kafka.server.ClientRequestQuotaManager;
 import kafka.server.ControllerMutationQuotaManager;
 import kafka.server.FetchManager;
-import kafka.server.FinalizedFeatureCache;
 import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
@@ -38,11 +36,13 @@ import kafka.server.ZkMetadataCache;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
+import kafka.server.SimpleApiVersionManager;
 import kafka.server.ZkAdminManager;
 import kafka.server.ZkSupport;
 import kafka.server.metadata.CachedConfigRepository;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
@@ -172,7 +172,6 @@ public class MetadataRequestBenchmark {
         Properties kafkaProps =  new Properties();
         kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
         kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
-        BrokerFeatures brokerFeatures = BrokerFeatures.createDefault();
         return new KafkaApis(requestChannel,
             new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache),
             replicaManager,
@@ -191,8 +190,7 @@ public class MetadataRequestBenchmark {
             "clusterId",
             new SystemTime(),
             null,
-            brokerFeatures,
-            new FinalizedFeatureCache(brokerFeatures));
+            new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER));
     }
 
     @TearDown(Level.Trial)


[kafka] 01/02: Fixed README and added clearer error message. (#10133)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4af420a9f95aeaad810f9cedd86e62f4f78ac43e
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue Feb 16 18:12:29 2021 -0500

    Fixed README and added clearer error message. (#10133)
    
    The script `test-raft-server-start.sh` requires the config to be specified with `--config`. I've included this in the README and added an error message for this specific case.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/tools/TestRaftServer.scala | 4 ++++
 raft/README.md                                       | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 5db6f84..2391ca4 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -26,6 +26,7 @@ import kafka.raft.{KafkaRaftManager, RaftManager}
 import kafka.security.CredentialProvider
 import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties}
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Logging, ShutdownableThread}
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
 import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
@@ -419,6 +420,9 @@ object TestRaftServer extends Logging {
         "Standalone raft server for performance testing")
 
       val configFile = opts.options.valueOf(opts.configOpt)
+      if (configFile == null) {
+        throw new InvalidConfigurationException("Missing configuration file. Should specify with '--config'")
+      }
       val serverProps = Utils.loadProps(configFile)
 
       // KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are
diff --git a/raft/README.md b/raft/README.md
index 4d51323..35d3969 100644
--- a/raft/README.md
+++ b/raft/README.md
@@ -9,7 +9,7 @@ we have a standalone test server which can be used for performance testing.
 Below we describe the details to set this up.
 
 ### Run Single Quorum ###
-    bin/test-raft-server-start.sh config/raft.properties
+    bin/test-raft-server-start.sh --config config/raft.properties
 
 ### Run Multi Node Quorum ###
 Create 3 separate raft quorum properties as the following: