You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/26 03:45:40 UTC
[kafka] branch 2.8 updated: KAFKA-12365;
Disable APIs not supported by KIP-500 broker/controller (#10194)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 0bbb83d KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)
0bbb83d is described below
commit 0bbb83dae24e69dae1313099a0610f2b794f3595
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 25 19:38:21 2021 -0800
KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)
This patch updates request `listeners` tags to be in line with what the KIP-500 broker/controller support today. We will re-enable these APIs as needed once we have added the support.
We have also updated `ControllerApis` to use `ApiVersionManager` and simplified the envelope handling logic.
Reviewers: Ron Dagostino <rd...@confluent.io>, Colin P. McCabe <cm...@apache.org>
---
.../common/message/AddOffsetsToTxnRequest.json | 2 +-
.../common/message/AddPartitionsToTxnRequest.json | 2 +-
.../common/message/AlterConfigsRequest.json | 2 +-
.../AlterPartitionReassignmentsRequest.json | 2 +-
.../message/AlterUserScramCredentialsRequest.json | 2 +-
.../common/message/CreateAclsRequest.json | 2 +-
.../message/CreateDelegationTokenRequest.json | 2 +-
.../common/message/CreatePartitionsRequest.json | 2 +-
.../common/message/DeleteAclsRequest.json | 2 +-
.../common/message/DescribeAclsRequest.json | 2 +-
.../message/DescribeClientQuotasRequest.json | 2 +-
.../common/message/DescribeConfigsRequest.json | 2 +-
.../message/DescribeDelegationTokenRequest.json | 2 +-
.../DescribeUserScramCredentialsRequest.json | 2 +-
.../common/message/ElectLeadersRequest.json | 2 +-
.../resources/common/message/EndTxnRequest.json | 2 +-
.../message/ExpireDelegationTokenRequest.json | 2 +-
.../common/message/InitProducerIdRequest.json | 2 +-
.../message/ListPartitionReassignmentsRequest.json | 2 +-
.../resources/common/message/MetadataRequest.json | 2 +-
.../message/RenewDelegationTokenRequest.json | 2 +-
.../common/message/TxnOffsetCommitRequest.json | 2 +-
.../common/message/UpdateFeaturesRequest.json | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 141 ++++++---------------
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
.../unit/kafka/server/ControllerApisTest.scala | 6 +-
.../tests/core/group_mode_transactions_test.py | 2 +-
tests/kafkatest/tests/core/transactions_test.py | 3 +-
28 files changed, 67 insertions(+), 134 deletions(-)
diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index ade3fc7..7212a02 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 25,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 4920da1..99e72a9 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 24,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index 31057e3..fa46656 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 33,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "AlterConfigsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index 2e12441..ee05b42 100644
--- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 45,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
index 8937394..70e1483 100644
--- a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 51,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "AlterUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json b/clients/src/main/resources/common/message/CreateAclsRequest.json
index 5b3bfed..109b444 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 30,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "CreateAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
index 0c31d32..d65d490 100644
--- a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 38,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "CreateDelegationTokenRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 6e24949..8053628 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 37,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "CreatePartitionsRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index fd7c152..1e7aa9a 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 31,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "DeleteAclsRequest",
// Version 1 adds the pattern type.
// Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DescribeAclsRequest.json b/clients/src/main/resources/common/message/DescribeAclsRequest.json
index 58886da..e551ca4 100644
--- a/clients/src/main/resources/common/message/DescribeAclsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeAclsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 29,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "DescribeAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
index d14cfc9..5ada552 100644
--- a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 48,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "DescribeClientQuotasRequest",
// Version 1 enables flexible versions.
"validVersions": "0-1",
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index 23be19c..f48b168 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 32,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker", "broker"],
"name": "DescribeConfigsRequest",
// Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.
diff --git a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
index da5bbd0..79c342e 100644
--- a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 41,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "DescribeDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
index 2f7a111..cef8929 100644
--- a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 50,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "DescribeUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index 8a095e1..6f0be57 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 43,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "ElectLeadersRequest",
// Version 1 implements multiple leader election types, as described by KIP-460.
//
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json
index f16ef76..7e7d41d 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 26,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
index c830a93..736f1df 100644
--- a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 40,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "ExpireDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index e8795e6..9e34505 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 22,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index f013e3f..75e8744 100644
--- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 46,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index 6690810..e5083a8 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 3,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "MetadataRequest",
"validVersions": "0-11",
"flexibleVersions": "9+",
diff --git a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
index 182682e..7240ac3 100644
--- a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 39,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "RenewDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index a832ef7..127ff3d 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 28,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker"],
"name": "TxnOffsetCommitRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 21e1bf6..41be8cf 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index abd8506..7bba474 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -24,15 +24,15 @@ import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.common.Node
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException}
import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.BaseRecords
import org.apache.kafka.common.requests._
@@ -40,9 +40,8 @@ import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.Node
import org.apache.kafka.controller.Controller
-import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
+import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.mutable
@@ -60,77 +59,14 @@ class ControllerApis(val requestChannel: RequestChannel,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
- val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
+ val controllerNodes: Seq[Node],
+ val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ")
- var supportedApiKeys = Set(
- ApiKeys.FETCH,
- ApiKeys.METADATA,
- //ApiKeys.SASL_HANDSHAKE
- ApiKeys.API_VERSIONS,
- ApiKeys.CREATE_TOPICS,
- //ApiKeys.DELETE_TOPICS,
- //ApiKeys.DESCRIBE_ACLS,
- //ApiKeys.CREATE_ACLS,
- //ApiKeys.DELETE_ACLS,
- //ApiKeys.DESCRIBE_CONFIGS,
- //ApiKeys.ALTER_CONFIGS,
- //ApiKeys.SASL_AUTHENTICATE,
- //ApiKeys.CREATE_PARTITIONS,
- //ApiKeys.CREATE_DELEGATION_TOKEN
- //ApiKeys.RENEW_DELEGATION_TOKEN
- //ApiKeys.EXPIRE_DELEGATION_TOKEN
- //ApiKeys.DESCRIBE_DELEGATION_TOKEN
- //ApiKeys.ELECT_LEADERS
- ApiKeys.INCREMENTAL_ALTER_CONFIGS,
- //ApiKeys.ALTER_PARTITION_REASSIGNMENTS
- //ApiKeys.LIST_PARTITION_REASSIGNMENTS
- ApiKeys.ALTER_CLIENT_QUOTAS,
- //ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
- //ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
- //ApiKeys.UPDATE_FEATURES
- ApiKeys.ENVELOPE,
- ApiKeys.VOTE,
- ApiKeys.BEGIN_QUORUM_EPOCH,
- ApiKeys.END_QUORUM_EPOCH,
- ApiKeys.DESCRIBE_QUORUM,
- ApiKeys.ALTER_ISR,
- ApiKeys.BROKER_REGISTRATION,
- ApiKeys.BROKER_HEARTBEAT,
- ApiKeys.UNREGISTER_BROKER,
- )
-
- private def maybeHandleInvalidEnvelope(
- envelope: RequestChannel.Request,
- forwardedApiKey: ApiKeys
- ): Boolean = {
- def sendEnvelopeError(error: Errors): Unit = {
- requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
- }
-
- if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
- // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
- sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
- true
- } else if (!forwardedApiKey.forwardable) {
- sendEnvelopeError(Errors.INVALID_REQUEST)
- true
- } else {
- false
- }
- }
-
override def handle(request: RequestChannel.Request): Unit = {
try {
- val handled = request.envelope.exists(envelope => {
- maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
- })
-
- if (handled)
- return
-
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
@@ -146,7 +82,9 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
- case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+ case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
+ case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
+ case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
}
} catch {
@@ -155,6 +93,27 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
+ def handleEnvelopeRequest(request: RequestChannel.Request): Unit = {
+ if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+ requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
+ s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
+ } else {
+ EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+ }
+ }
+
+ def handleSaslHandshakeRequest(request: RequestChannel.Request): Unit = {
+ val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+ requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
+ }
+
+ def handleSaslAuthenticateRequest(request: RequestChannel.Request): Unit = {
+ val responseData = new SaslAuthenticateResponseData()
+ .setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+ .setErrorMessage("SaslAuthenticate request received after successful authentication")
+ requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
+ }
+
private def handleFetch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
- def createResponseCallback(features: FeatureMapAndEpoch,
- requestThrottleMs: Int): ApiVersionsResponse = {
+ def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
val apiVersionRequest = request.body[ApiVersionsRequest]
- if (apiVersionRequest.hasUnsupportedRequestVersion)
+ if (apiVersionRequest.hasUnsupportedRequestVersion) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
- else if (!apiVersionRequest.isValid)
+ } else if (!apiVersionRequest.isValid) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
- else {
- val data = new ApiVersionsResponseData().
- setErrorCode(0.toShort).
- setThrottleTimeMs(requestThrottleMs).
- setFinalizedFeaturesEpoch(features.epoch())
- supportedFeatures.foreach {
- case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
- setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
- }
- // features.finalizedFeatures().asScala.foreach {
- // case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
- // setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
- // }
- ApiKeys.values().foreach {
- key =>
- if (supportedApiKeys.contains(key)) {
- data.apiKeys().add(new ApiVersion().
- setApiKey(key.id).
- setMaxVersion(key.latestVersion()).
- setMinVersion(key.oldestVersion()))
- }
- }
- new ApiVersionsResponse(data)
+ } else {
+ apiVersionManager.apiVersionResponse(requestThrottleMs)
}
}
- // FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
- // case Success(features) =>
- requestHelper.sendResponseMaybeThrottle(request,
- requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
- new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
- // case Failure(e) => requestHelper.handleError(request, e)
- // }
+ requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
}
private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 50fc6e2..18ea2c4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -168,7 +168,8 @@ class ControllerServer(
raftManager,
config,
metaProperties,
- controllerNodes.toSeq)
+ controllerNodes.toSeq,
+ apiVersionManager)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 3533fcf..26960d6 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -23,11 +23,12 @@ import java.util.Properties
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
+import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager, SimpleApiVersionManager}
import kafka.utils.MockTime
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors
@@ -81,7 +82,8 @@ class ControllerApisTest {
// FIXME: Would make more sense to set controllerId here
MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
- Seq.empty
+ Seq.empty,
+ new SimpleApiVersionManager(ListenerType.CONTROLLER)
)
}
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index e9638d5..18e6d97 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -269,7 +269,7 @@ class GroupModeTransactionsTest(Test):
@cluster(num_nodes=10)
@matrix(failure_mode=["hard_bounce", "clean_bounce"],
- bounce_target=["brokers", "clients"], metadata_quorum=quorum.all_non_upgrade)
+ bounce_target=["brokers", "clients"])
def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index 2891e70..8989d83 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -244,8 +244,7 @@ class TransactionsTest(Test):
@matrix(failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"],
check_order=[True, False],
- use_group_metadata=[True, False],
- metadata_quorum=quorum.all_non_upgrade)
+ use_group_metadata=[True, False])
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol