You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/04/07 13:27:08 UTC
[kafka] branch trunk updated: MINOR: Pass one action per unique
resource name in KafkaApis.filterAuthorized (#8432)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cd1e46c MINOR: Pass one action per unique resource name in KafkaApis.filterAuthorized (#8432)
cd1e46c is described below
commit cd1e46c8bb46f1e5303c51f476c74e33b522fce8
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Apr 7 15:26:18 2020 +0200
MINOR: Pass one action per unique resource name in KafkaApis.filterAuthorized (#8432)
90bbeedf52f introduced a regression resulting in passing an action per resource
name to the `Authorizer` instead of passing one per unique resource name. Refactor
the signatures of both `filterAuthorized` and `authorize` to make them easier to test
and add a test for each.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 166 +++++++++++----------
.../scala/unit/kafka/server/KafkaApisTest.scala | 97 +++++++++++-
2 files changed, 184 insertions(+), 79 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a7607a..4a6082e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -352,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// reject the request if not authorized to the group
- if (!authorize(request, READ, GROUP, offsetCommitRequest.data.groupId)) {
+ if (!authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
val error = Errors.GROUP_AUTHORIZATION_FAILED
val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
offsetCommitRequest.data.topics,
@@ -378,7 +378,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
- val authorizedTopics = filterAuthorized(request, READ, TOPIC, offsetCommitRequest.data.topics.asScala.map(_.name))
+ val authorizedTopics = filterAuthorized(request.context, READ, TOPIC,
+ offsetCommitRequest.data.topics.asScala.map(_.name))
for (topicData <- offsetCommitRequest.data.topics.asScala) {
for (partitionData <- topicData.partitions.asScala) {
val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
@@ -472,14 +473,14 @@ class KafkaApis(val requestChannel: RequestChannel,
if (produceRequest.hasTransactionalRecords) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
- authorize(request, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
+ authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
if (!isAuthorizedTransactional) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
// Note that authorization to a transactionalId implies ProducerId authorization
- } else if (produceRequest.hasIdempotentRecords && !authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+ } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
@@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
- val authorizedTopics = filterAuthorized(request, WRITE, TOPIC,
+ val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC,
produceRequest.partitionRecordsOrFail.asScala.toSeq.map(_._1.topic))
for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
@@ -625,7 +626,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
- if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+ if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
fetchContext.foreachPartition { (topicPartition, data) =>
if (!metadataCache.contains(topicPartition))
erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -641,7 +642,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Regular Kafka consumers need READ permission on each partition they are fetching.
val fetchTopics = new mutable.ArrayBuffer[String]
fetchContext.foreachPartition { (topicPartition, _) => fetchTopics += topicPartition.topic }
- val authorizedTopics = filterAuthorized(request, READ, TOPIC, fetchTopics)
+ val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, fetchTopics)
fetchContext.foreachPartition { (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
@@ -887,7 +888,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
- val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
+ val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+ offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
}
@@ -927,7 +929,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
- val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
+ val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+ offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
}
@@ -1111,15 +1114,17 @@ class KafkaApis(val requestChannel: RequestChannel,
else
metadataRequest.topics.asScala.toSet
- val authorizedForDescribeTopics = filterAuthorized(request, DESCRIBE, TOPIC, topics.toSeq, logIfDenied = !metadataRequest.isAllTopics)
+ val authorizedForDescribeTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+ topics.toSeq, logIfDenied = !metadataRequest.isAllTopics)
var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
var unauthorizedForCreateTopics = Set[String]()
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
- if (!authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
- val authorizedForCreateTopics = filterAuthorized(request, CREATE, TOPIC, nonExistingTopics.toSeq)
+ if (!authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
+ val authorizedForCreateTopics = filterAuthorized(request.context, CREATE, TOPIC,
+ nonExistingTopics.toSeq)
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
}
@@ -1156,7 +1161,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (request.header.apiVersion >= 8) {
// get cluster authorized operations
if (metadataRequest.data.includeClusterAuthorizedOperations) {
- if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+ if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
clusterAuthorizedOperations = authorizedOperations(request, Resource.CLUSTER)
else
clusterAuthorizedOperations = 0
@@ -1196,14 +1201,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.body[OffsetFetchRequest]
def partitionAuthorized[T](elements: List[T], topic: T => String): (Seq[T], Seq[T]) = {
- val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, elements.map(topic))
+ val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, elements.map(topic))
elements.partition(element => authorizedTopics.contains(topic.apply(element)))
}
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
// reject the request if not authorized to the group
- if (!authorize(request, DESCRIBE, GROUP, offsetFetchRequest.groupId))
+ if (!authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId))
offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
@@ -1270,10 +1275,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
- !authorize(request, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
+ !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
- !authorize(request, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
+ !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else {
// get metadata (and create the topic if necessary)
@@ -1342,7 +1347,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeGroupsResponseData = new DescribeGroupsResponseData()
describeRequest.data.groups.asScala.foreach { groupId =>
- if (!authorize(request, DESCRIBE, GROUP, groupId)) {
+ if (!authorize(request.context, DESCRIBE, GROUP, groupId)) {
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1379,7 +1384,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
val (error, groups) = groupCoordinator.handleListGroups()
- if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+ if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
// With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListGroupsResponse(new ListGroupsResponseData()
@@ -1391,7 +1396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setThrottleTimeMs(requestThrottleMs)
))
else {
- val filteredGroups = groups.filter(group => authorize(request, DESCRIBE, GROUP, group.groupId))
+ val filteredGroups = groups.filter(group => authorize(request.context, DESCRIBE, GROUP, group.groupId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(error.code)
@@ -1439,7 +1444,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
- } else if (!authorize(request, READ, GROUP, joinGroupRequest.data.groupId)) {
+ } else if (!authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)
@@ -1490,7 +1495,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) {
// Starting from version 5, ProtocolType and ProtocolName fields are mandatory.
sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
- } else if (!authorize(request, READ, GROUP, syncGroupRequest.data.groupId)) {
+ } else if (!authorize(request.context, READ, GROUP, syncGroupRequest.data.groupId)) {
sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
@@ -1516,7 +1521,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groups = deleteGroupsRequest.data.groupsNames.asScala.toSet
val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
- authorize(request, DELETE, GROUP, group)
+ authorize(request.context, DELETE, GROUP, group)
}
val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
@@ -1560,7 +1565,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(Errors.UNSUPPORTED_VERSION)
- } else if (!authorize(request, READ, GROUP, heartbeatRequest.data.groupId)) {
+ } else if (!authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new HeartbeatResponse(
new HeartbeatResponseData()
@@ -1582,7 +1587,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val members = leaveGroupRequest.members.asScala.toList
- if (!authorize(request, READ, GROUP, leaveGroupRequest.data.groupId)) {
+ if (!authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
sendResponseMaybeThrottle(request, requestThrottleMs => {
new LeaveGroupResponse(new LeaveGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
@@ -1673,11 +1678,14 @@ class KafkaApis(val requestChannel: RequestChannel,
createTopicsRequest.data.topics.asScala.foreach { topic =>
results.add(new CreatableTopicResult().setName(topic.name))
}
- val hasClusterAuthorization = authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
+ val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+ logIfDenied = false)
val topics = createTopicsRequest.data.topics.asScala.map(_.name)
- val authorizedTopics = if (hasClusterAuthorization) topics.toSet else filterAuthorized(request, CREATE, TOPIC, topics.toSeq)
- val authorizedForDescribeConfigs = filterAuthorized(request, DESCRIBE_CONFIGS, TOPIC, topics.toSeq, logIfDenied = false)
- .map(name => name -> results.find(name)).toMap
+ val authorizedTopics =
+ if (hasClusterAuthorization) topics.toSet
+ else filterAuthorized(request.context, CREATE, TOPIC, topics.toSeq)
+ val authorizedForDescribeConfigs = filterAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+ topics.toSeq, logIfDenied = false).map(name => name -> results.find(name)).toMap
results.asScala.foreach(topic => {
if (results.findAll(topic.name).size > 1) {
@@ -1753,7 +1761,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.filter { _._2.size > 1 }
.keySet
val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
- val authorizedTopics = filterAuthorized(request, ALTER, TOPIC, notDuped.map(_.name))
+ val authorizedTopics = filterAuthorized(request.context, ALTER, TOPIC, notDuped.map(_.name))
val (authorized, unauthorized) = notDuped.partition { topic => authorizedTopics.contains(topic.name) }
val (queuedForDeletion, valid) = authorized.partition { topic =>
@@ -1807,7 +1815,8 @@ class KafkaApis(val requestChannel: RequestChannel,
results.add(new DeletableTopicResult()
.setName(topic))
}
- val authorizedTopics = filterAuthorized(request, DELETE, TOPIC, results.asScala.toSeq.map(_.name))
+ val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC,
+ results.asScala.toSeq.map(_.name))
results.asScala.foreach(topic => {
if (!authorizedTopics.contains(topic.name))
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -1845,7 +1854,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsPartitionResult]()
val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
- val authorizedTopics = filterAuthorized(request, DELETE, TOPIC,
+ val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC,
deleteRecordsRequest.data.topics.asScala.map(_.name))
val deleteTopicPartitions = deleteRecordsRequest.data.topics.asScala.flatMap(deleteTopic => {
deleteTopic.partitions.asScala.map(deletePartition => {
@@ -1910,11 +1919,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val transactionalId = initProducerIdRequest.data.transactionalId
if (transactionalId != null) {
- if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) {
+ if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
- } else if (!authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+ } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
@@ -1951,7 +1960,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val endTxnRequest = request.body[EndTxnRequest]
val transactionalId = endTxnRequest.data.transactionalId
- if (authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) {
+ if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new EndTxnResponse(new EndTxnResponseData()
@@ -2093,7 +2102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.transactionalId
val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
- if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId))
+ if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
else {
@@ -2101,7 +2110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedPartitions = mutable.Set[TopicPartition]()
- val authorizedTopics = filterAuthorized(request, WRITE, TOPIC,
+ val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC,
partitionsToAdd.map(_.topic).filterNot(org.apache.kafka.common.internals.Topic.isInternal))
for (topicPartition <- partitionsToAdd) {
if (!authorizedTopics.contains(topicPartition.topic))
@@ -2148,10 +2157,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupId = addOffsetsToTxnRequest.consumerGroupId
val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
- if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId))
+ if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
- else if (!authorize(request, READ, GROUP, groupId))
+ else if (!authorize(request.context, READ, GROUP, groupId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
else {
@@ -2180,15 +2189,15 @@ class KafkaApis(val requestChannel: RequestChannel,
// authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
// since it is implied by transactionalId authorization
- if (!authorize(request, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId))
+ if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
- else if (!authorize(request, READ, GROUP, txnOffsetCommitRequest.data.groupId))
+ else if (!authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else {
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
- val authorizedTopics = filterAuthorized(request, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic))
+ val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic))
for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
if (!authorizedTopics.contains(topicPartition.topic))
@@ -2358,10 +2367,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required
// cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation
// following a leader change, so we also allow topic describe permission.
- val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
+ val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
(requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData])
} else {
- val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic))
+ val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic))
requestInfo.partition {
case (tp, _) => authorizedTopics.contains(tp.topic)
}
@@ -2385,9 +2394,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case ConfigResource.Type.BROKER_LOGGER =>
throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
case ConfigResource.Type.BROKER =>
- authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+ authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
- authorize(request, ALTER_CONFIGS, TOPIC, resource.name)
+ authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
}
}
@@ -2505,9 +2514,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
- authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+ authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
- authorize(request, ALTER_CONFIGS, TOPIC, resource.name)
+ authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
}
}
@@ -2526,9 +2535,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
- authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
+ authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
- authorize(request, DESCRIBE_CONFIGS, TOPIC, resource.name)
+ authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
}
}
@@ -2547,7 +2556,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {
val alterReplicaDirsRequest = request.body[AlterReplicaLogDirsRequest]
val responseMap = {
- if (authorize(request, ALTER, CLUSTER, CLUSTER_NAME))
+ if (authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME))
replicaManager.alterReplicaLogDirs(alterReplicaDirsRequest.partitionDirs.asScala)
else
alterReplicaDirsRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -2558,7 +2567,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {
val describeLogDirsDirRequest = request.body[DescribeLogDirsRequest]
val logDirInfos = {
- if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
+ if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
val partitions =
if (describeLogDirsDirRequest.isAllTopicPartitions)
replicaManager.logManager.allLogs.map(_.topicPartition).toSet
@@ -2691,7 +2700,7 @@ class KafkaApis(val requestChannel: RequestChannel,
None
else
Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList)
- def authorizeToken(tokenId: String) = authorize(request, DESCRIBE, DELEGATION_TOKEN, tokenId)
+ def authorizeToken(tokenId: String) = authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
val tokens = tokenManager.getTokens(eligible)
sendResponseCallback(Errors.NONE, tokens)
@@ -2758,7 +2767,7 @@ class KafkaApis(val requestChannel: RequestChannel,
})
}
- if (!authorize(request, ALTER, CLUSTER, CLUSTER_NAME)) {
+ if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
val partitionErrors: Map[TopicPartition, ApiError] =
electionRequest.topicPartitions.iterator.map(partition => partition -> error).toMap
@@ -2785,8 +2794,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetDeleteRequest = request.body[OffsetDeleteRequest]
val groupId = offsetDeleteRequest.data.groupId
- if (authorize(request, DELETE, GROUP, groupId)) {
- val authorizedTopics = filterAuthorized(request, READ, TOPIC,
+ if (authorize(request.context, DELETE, GROUP, groupId)) {
+ val authorizedTopics = filterAuthorized(request.context, READ, TOPIC,
offsetDeleteRequest.data.topics.asScala.map(_.name).toSeq)
val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
@@ -2842,7 +2851,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = {
val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
- if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+ if (authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter).map { case (quotaEntity, quotaConfigs) =>
quotaEntity -> quotaConfigs.map { case (key, value) => key -> Double.box(value) }.asJava
}.asJava
@@ -2857,7 +2866,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
- if (authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+ if (authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries().asScala.toSeq,
alterClientQuotasRequest.validateOnly()).asJava
sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -2868,45 +2877,48 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def authorize(request: RequestChannel.Request,
- operation: AclOperation,
- resourceType: ResourceType,
- resourceName: String,
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true,
- refCount: Int = 1): Boolean = {
+ // private package for testing
+ private[server] def authorize(requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType,
+ resourceName: String,
+ logIfAllowed: Boolean = true,
+ logIfDenied: Boolean = true,
+ refCount: Int = 1): Boolean = {
authorizer.forall { authZ =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
- authZ.authorize(request.context, actions).asScala.head == AuthorizationResult.ALLOWED
+ authZ.authorize(requestContext, actions).asScala.head == AuthorizationResult.ALLOWED
}
}
- private def filterAuthorized(request: RequestChannel.Request,
- operation: AclOperation,
- resourceType: ResourceType,
- resourceNames: Seq[String],
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true): Set[String] = {
+ // private package for testing
+ private[server] def filterAuthorized(requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType,
+ resourceNames: Seq[String],
+ logIfAllowed: Boolean = true,
+ logIfDenied: Boolean = true): Set[String] = {
+ val uniqueResourceNames = resourceNames.distinct
authorizer match {
case Some(authZ) =>
val groupedResourceNames = resourceNames.groupBy(identity)
- val actions = resourceNames.map { resourceName =>
+ val actions = uniqueResourceNames.map { resourceName =>
val count = groupedResourceNames(resourceName).size
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}
- authZ.authorize(request.context, actions.asJava).asScala
- .zip(resourceNames)
+ authZ.authorize(requestContext, actions.asJava).asScala
+ .zip(uniqueResourceNames)
.filter { case (authzResult, _) => authzResult == AuthorizationResult.ALLOWED }
.map { case (_, resourceName) => resourceName }.toSet
case None =>
- resourceNames.toSet
+ uniqueResourceNames.toSet
}
}
private def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
- if (!authorize(request, operation, CLUSTER, CLUSTER_NAME))
+ if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1b6c328..a4e9145 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -40,6 +40,7 @@ import kafka.network.RequestChannel.SendResponse
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
@@ -59,7 +60,12 @@ import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
+import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.Action
+import org.apache.kafka.server.authorizer.AuthorizationResult
import org.apache.kafka.server.authorizer.Authorizer
import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer}
@@ -82,7 +88,6 @@ class KafkaApisTest {
private val metrics = new Metrics()
private val brokerId = 1
private val metadataCache = new MetadataCache(brokerId)
- private val authorizer: Option[Authorizer] = None
private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
@@ -101,7 +106,8 @@ class KafkaApisTest {
metrics.close()
}
- def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
+ def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
+ authorizer: Option[Authorizer] = None): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
@@ -127,6 +133,93 @@ class KafkaApisTest {
}
@Test
+ def testAuthorize(): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+ val operation = AclOperation.WRITE
+ val resourceType = ResourceType.TOPIC
+ val resourceName = "topic-1"
+ val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
+ clientId, 0)
+ val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+ KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
+
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
+ 1, true, true)
+ )
+
+ EasyMock.expect(authorizer.authorize(
+ requestContext, expectedActions.asJava
+ )).andReturn(
+ Seq(AuthorizationResult.ALLOWED).asJava
+ ).once()
+
+ EasyMock.replay(authorizer)
+
+ val result = createKafkaApis(authorizer = Some(authorizer)).authorize(
+ requestContext,
+ operation,
+ resourceType,
+ resourceName,
+ )
+
+ verify(authorizer)
+
+ assertEquals(true, result)
+ }
+
+ @Test
+ def testFilterAuthorized(): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+ val operation = AclOperation.WRITE
+ val resourceType = ResourceType.TOPIC
+ val resourceName1 = "topic-1"
+ val resourceName2 = "topic-2"
+ val resourceName3 = "topic-3"
+ val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
+ clientId, 0)
+ val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+ KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
+
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
+ 2, true, true),
+ new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
+ 1, true, true),
+ new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
+ 1, true, true),
+ )
+
+ EasyMock.expect(authorizer.authorize(
+ requestContext, expectedActions.asJava
+ )).andReturn(
+ Seq(
+ AuthorizationResult.ALLOWED,
+ AuthorizationResult.DENIED,
+ AuthorizationResult.ALLOWED
+ ).asJava
+ ).once()
+
+ EasyMock.replay(authorizer)
+
+ val result = createKafkaApis(authorizer = Some(authorizer)).filterAuthorized(
+ requestContext,
+ operation,
+ resourceType,
+ // Duplicate resource names should not trigger multiple calls to authorize
+ Seq(resourceName1, resourceName2, resourceName1, resourceName3)
+ )
+
+ verify(authorizer)
+
+ assertEquals(Set(resourceName1, resourceName3), result)
+ }
+
+ @Test
def testOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 1)