You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/26 03:45:40 UTC

[kafka] branch 2.8 updated: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 0bbb83d  KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)
0bbb83d is described below

commit 0bbb83dae24e69dae1313099a0610f2b794f3595
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 25 19:38:21 2021 -0800

    KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)
    
    This patch updates request `listeners` tags to be in line with what the KIP-500 broker/controller support today. We will re-enable these APIs as needed once we have added the support.
    
    We have also updated `ControllerApis` to use `ApiVersionManager` and simplified the envelope handling logic.
    
    Reviewers: Ron Dagostino <rd...@confluent.io>, Colin P. McCabe <cm...@apache.org>
---
 .../common/message/AddOffsetsToTxnRequest.json     |   2 +-
 .../common/message/AddPartitionsToTxnRequest.json  |   2 +-
 .../common/message/AlterConfigsRequest.json        |   2 +-
 .../AlterPartitionReassignmentsRequest.json        |   2 +-
 .../message/AlterUserScramCredentialsRequest.json  |   2 +-
 .../common/message/CreateAclsRequest.json          |   2 +-
 .../message/CreateDelegationTokenRequest.json      |   2 +-
 .../common/message/CreatePartitionsRequest.json    |   2 +-
 .../common/message/DeleteAclsRequest.json          |   2 +-
 .../common/message/DescribeAclsRequest.json        |   2 +-
 .../message/DescribeClientQuotasRequest.json       |   2 +-
 .../common/message/DescribeConfigsRequest.json     |   2 +-
 .../message/DescribeDelegationTokenRequest.json    |   2 +-
 .../DescribeUserScramCredentialsRequest.json       |   2 +-
 .../common/message/ElectLeadersRequest.json        |   2 +-
 .../resources/common/message/EndTxnRequest.json    |   2 +-
 .../message/ExpireDelegationTokenRequest.json      |   2 +-
 .../common/message/InitProducerIdRequest.json      |   2 +-
 .../message/ListPartitionReassignmentsRequest.json |   2 +-
 .../resources/common/message/MetadataRequest.json  |   2 +-
 .../message/RenewDelegationTokenRequest.json       |   2 +-
 .../common/message/TxnOffsetCommitRequest.json     |   2 +-
 .../common/message/UpdateFeaturesRequest.json      |   2 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 141 ++++++---------------
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   6 +-
 .../tests/core/group_mode_transactions_test.py     |   2 +-
 tests/kafkatest/tests/core/transactions_test.py    |   3 +-
 28 files changed, 67 insertions(+), 134 deletions(-)

diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index ade3fc7..7212a02 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 25,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "AddOffsetsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 4920da1..99e72a9 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 24,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "AddPartitionsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index 31057e3..fa46656 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 33,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index 2e12441..ee05b42 100644
--- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 45,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "AlterPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
index 8937394..70e1483 100644
--- a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 51,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "AlterUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json b/clients/src/main/resources/common/message/CreateAclsRequest.json
index 5b3bfed..109b444 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 30,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "CreateAclsRequest",
   // Version 1 adds resource pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
index 0c31d32..d65d490 100644
--- a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 38,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "CreateDelegationTokenRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 6e24949..8053628 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 37,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "CreatePartitionsRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index fd7c152..1e7aa9a 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "DeleteAclsRequest",
   // Version 1 adds the pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DescribeAclsRequest.json b/clients/src/main/resources/common/message/DescribeAclsRequest.json
index 58886da..e551ca4 100644
--- a/clients/src/main/resources/common/message/DescribeAclsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 29,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "DescribeAclsRequest",
   // Version 1 adds resource pattern type.
   // Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
index d14cfc9..5ada552 100644
--- a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "DescribeClientQuotasRequest",
   // Version 1 enables flexible versions.
   "validVersions": "0-1",
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index 23be19c..f48b168 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 32,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "broker"],
   "name": "DescribeConfigsRequest",
   // Version 1 adds IncludeSynonyms.
   // Version 2 is the same as version 1.
diff --git a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
index da5bbd0..79c342e 100644
--- a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 41,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "DescribeDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
index 2f7a111..cef8929 100644
--- a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 50,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "DescribeUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index 8a095e1..6f0be57 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 43,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "ElectLeadersRequest",
   // Version 1 implements multiple leader election types, as described by KIP-460.
   //
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json
index f16ef76..7e7d41d 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 26,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "EndTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
index c830a93..736f1df 100644
--- a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 40,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "ExpireDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index e8795e6..9e34505 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 22,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "InitProducerIdRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index f013e3f..75e8744 100644
--- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 46,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "ListPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index 6690810..e5083a8 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "MetadataRequest",
   "validVersions": "0-11",
   "flexibleVersions": "9+",
diff --git a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
index 182682e..7240ac3 100644
--- a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 39,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "RenewDelegationTokenRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index a832ef7..127ff3d 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 28,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],
   "name": "TxnOffsetCommitRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 21e1bf6..41be8cf 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 57,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],
   "name": "UpdateFeaturesRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index abd8506..7bba474 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -24,15 +24,15 @@ import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException}
 import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record.BaseRecords
 import org.apache.kafka.common.requests._
@@ -40,9 +40,8 @@ import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.Node
 import org.apache.kafka.controller.Controller
-import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
+import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
 import org.apache.kafka.server.authorizer.Authorizer
 
 import scala.collection.mutable
@@ -60,77 +59,14 @@ class ControllerApis(val requestChannel: RequestChannel,
                      val raftManager: RaftManager[ApiMessageAndVersion],
                      val config: KafkaConfig,
                      val metaProperties: MetaProperties,
-                     val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
+                     val controllerNodes: Seq[Node],
+                     val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
 
   val authHelper = new AuthHelper(authorizer)
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ")
 
-  var supportedApiKeys = Set(
-    ApiKeys.FETCH,
-    ApiKeys.METADATA,
-    //ApiKeys.SASL_HANDSHAKE
-    ApiKeys.API_VERSIONS,
-    ApiKeys.CREATE_TOPICS,
-    //ApiKeys.DELETE_TOPICS,
-    //ApiKeys.DESCRIBE_ACLS,
-    //ApiKeys.CREATE_ACLS,
-    //ApiKeys.DELETE_ACLS,
-    //ApiKeys.DESCRIBE_CONFIGS,
-    //ApiKeys.ALTER_CONFIGS,
-    //ApiKeys.SASL_AUTHENTICATE,
-    //ApiKeys.CREATE_PARTITIONS,
-    //ApiKeys.CREATE_DELEGATION_TOKEN
-    //ApiKeys.RENEW_DELEGATION_TOKEN
-    //ApiKeys.EXPIRE_DELEGATION_TOKEN
-    //ApiKeys.DESCRIBE_DELEGATION_TOKEN
-    //ApiKeys.ELECT_LEADERS
-    ApiKeys.INCREMENTAL_ALTER_CONFIGS,
-    //ApiKeys.ALTER_PARTITION_REASSIGNMENTS
-    //ApiKeys.LIST_PARTITION_REASSIGNMENTS
-    ApiKeys.ALTER_CLIENT_QUOTAS,
-    //ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
-    //ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
-    //ApiKeys.UPDATE_FEATURES
-    ApiKeys.ENVELOPE,
-    ApiKeys.VOTE,
-    ApiKeys.BEGIN_QUORUM_EPOCH,
-    ApiKeys.END_QUORUM_EPOCH,
-    ApiKeys.DESCRIBE_QUORUM,
-    ApiKeys.ALTER_ISR,
-    ApiKeys.BROKER_REGISTRATION,
-    ApiKeys.BROKER_HEARTBEAT,
-    ApiKeys.UNREGISTER_BROKER,
-  )
-
-  private def maybeHandleInvalidEnvelope(
-                                          envelope: RequestChannel.Request,
-                                          forwardedApiKey: ApiKeys
-                                        ): Boolean = {
-    def sendEnvelopeError(error: Errors): Unit = {
-      requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
-    }
-
-    if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
-      // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
-      sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
-      true
-    } else if (!forwardedApiKey.forwardable) {
-      sendEnvelopeError(Errors.INVALID_REQUEST)
-      true
-    } else {
-      false
-    }
-  }
-
   override def handle(request: RequestChannel.Request): Unit = {
     try {
-      val handled = request.envelope.exists(envelope => {
-        maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
-      })
-
-      if (handled)
-        return
-
       request.header.apiKey match {
         case ApiKeys.FETCH => handleFetch(request)
         case ApiKeys.METADATA => handleMetadataRequest(request)
@@ -146,7 +82,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
         case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
-        case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+        case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
+        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
+        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
       }
     } catch {
@@ -155,6 +93,27 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleEnvelopeRequest(request: RequestChannel.Request): Unit = {
+    if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
+        s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
+    } else {
+      EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+    }
+  }
+
+  def handleSaslHandshakeRequest(request: RequestChannel.Request): Unit = {
+    val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+    requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
+  }
+
+  def handleSaslAuthenticateRequest(request: RequestChannel.Request): Unit = {
+    val responseData = new SaslAuthenticateResponseData()
+      .setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+      .setErrorMessage("SaslAuthenticate request received after successful authentication")
+    requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
+  }
+
   private def handleFetch(request: RequestChannel.Request): Unit = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(features: FeatureMapAndEpoch,
-                               requestThrottleMs: Int): ApiVersionsResponse = {
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val data = new ApiVersionsResponseData().
-          setErrorCode(0.toShort).
-          setThrottleTimeMs(requestThrottleMs).
-          setFinalizedFeaturesEpoch(features.epoch())
-        supportedFeatures.foreach {
-          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
-            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
-        }
-        //        features.finalizedFeatures().asScala.foreach {
-        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
-        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
-        //        }
-        ApiKeys.values().foreach {
-          key =>
-            if (supportedApiKeys.contains(key)) {
-              data.apiKeys().add(new ApiVersion().
-                setApiKey(key.id).
-                setMaxVersion(key.latestVersion()).
-                setMinVersion(key.oldestVersion()))
-            }
-        }
-        new ApiVersionsResponse(data)
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)
       }
     }
-    //    FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
-    //      case Success(features) =>
-    requestHelper.sendResponseMaybeThrottle(request,
-      requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
-        new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
-    //      case Failure(e) => requestHelper.handleError(request, e)
-    //    }
+    requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
   }
 
   private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 50fc6e2..18ea2c4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -168,7 +168,8 @@ class ControllerServer(
         raftManager,
         config,
         metaProperties,
-        controllerNodes.toSeq)
+        controllerNodes.toSeq,
+        apiVersionManager)
       controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel,
         controllerApis,
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 3533fcf..26960d6 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -23,11 +23,12 @@ import java.util.Properties
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
+import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager, SimpleApiVersionManager}
 import kafka.utils.MockTime
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.ClusterAuthorizationException
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.BrokerRegistrationRequestData
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.Errors
@@ -81,7 +82,8 @@ class ControllerApisTest {
 
       // FIXME: Would make more sense to set controllerId here
       MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
-      Seq.empty
+      Seq.empty,
+      new SimpleApiVersionManager(ListenerType.CONTROLLER)
     )
   }
 
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index e9638d5..18e6d97 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -269,7 +269,7 @@ class GroupModeTransactionsTest(Test):
 
     @cluster(num_nodes=10)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"], metadata_quorum=quorum.all_non_upgrade)
+            bounce_target=["brokers", "clients"])
     def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index 2891e70..8989d83 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -244,8 +244,7 @@ class TransactionsTest(Test):
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
             bounce_target=["brokers", "clients"],
             check_order=[True, False],
-            use_group_metadata=[True, False],
-            metadata_quorum=quorum.all_non_upgrade)
+            use_group_metadata=[True, False])
     def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol