You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/04/07 13:27:08 UTC

[kafka] branch trunk updated: MINOR: Pass one action per unique resource name in KafkaApis.filterAuthorized (#8432)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd1e46c  MINOR: Pass one action per unique resource name in KafkaApis.filterAuthorized (#8432)
cd1e46c is described below

commit cd1e46c8bb46f1e5303c51f476c74e33b522fce8
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Apr 7 15:26:18 2020 +0200

    MINOR: Pass one action per unique resource name in KafkaApis.filterAuthorized (#8432)
    
    90bbeedf52f introduced a regression resulting in passing an action per resource
    name to the `Authorizer` instead of passing one per unique resource name. Refactor
    the signatures of both `filterAuthorized` and `authorize` to make them easier to test
    and add a test for each.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 166 +++++++++++----------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  97 +++++++++++-
 2 files changed, 184 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a7607a..4a6082e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -352,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     // reject the request if not authorized to the group
-    if (!authorize(request, READ, GROUP, offsetCommitRequest.data.groupId)) {
+    if (!authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
       val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
         offsetCommitRequest.data.topics,
@@ -378,7 +378,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val authorizedTopics = filterAuthorized(request, READ, TOPIC, offsetCommitRequest.data.topics.asScala.map(_.name))
+      val authorizedTopics = filterAuthorized(request.context, READ, TOPIC,
+        offsetCommitRequest.data.topics.asScala.map(_.name))
       for (topicData <- offsetCommitRequest.data.topics.asScala) {
         for (partitionData <- topicData.partitions.asScala) {
           val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
@@ -472,14 +473,14 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (produceRequest.hasTransactionalRecords) {
       val isAuthorizedTransactional = produceRequest.transactionalId != null &&
-        authorize(request, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
+        authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
       if (!isAuthorizedTransactional) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
       // Note that authorization to a transactionalId implies ProducerId authorization
 
-    } else if (produceRequest.hasIdempotentRecords && !authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
       sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
@@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
     val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
-    val authorizedTopics = filterAuthorized(request, WRITE, TOPIC,
+    val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC,
       produceRequest.partitionRecordsOrFail.asScala.toSeq.map(_._1.topic))
 
     for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
@@ -625,7 +626,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
-      if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
         fetchContext.foreachPartition { (topicPartition, data) =>
           if (!metadataCache.contains(topicPartition))
             erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -641,7 +642,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // Regular Kafka consumers need READ permission on each partition they are fetching.
       val fetchTopics = new mutable.ArrayBuffer[String]
       fetchContext.foreachPartition { (topicPartition, _) => fetchTopics += topicPartition.topic }
-      val authorizedTopics = filterAuthorized(request, READ, TOPIC, fetchTopics)
+      val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, fetchTopics)
       fetchContext.foreachPartition { (topicPartition, data) =>
         if (!authorizedTopics.contains(topicPartition.topic))
           erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
@@ -887,7 +888,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
+    val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+      offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
       case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
     }
@@ -927,7 +929,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
+    val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+      offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic))
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
       case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
     }
@@ -1111,15 +1114,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     else
       metadataRequest.topics.asScala.toSet
 
-    val authorizedForDescribeTopics = filterAuthorized(request, DESCRIBE, TOPIC, topics.toSeq, logIfDenied = !metadataRequest.isAllTopics)
+    val authorizedForDescribeTopics = filterAuthorized(request.context, DESCRIBE, TOPIC,
+      topics.toSeq, logIfDenied = !metadataRequest.isAllTopics)
     var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
     var unauthorizedForCreateTopics = Set[String]()
 
     if (authorizedTopics.nonEmpty) {
       val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
       if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
-        if (!authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
-          val authorizedForCreateTopics = filterAuthorized(request, CREATE, TOPIC, nonExistingTopics.toSeq)
+        if (!authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
+          val authorizedForCreateTopics = filterAuthorized(request.context, CREATE, TOPIC,
+            nonExistingTopics.toSeq)
           unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
           authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
         }
@@ -1156,7 +1161,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (request.header.apiVersion >= 8) {
       // get cluster authorized operations
       if (metadataRequest.data.includeClusterAuthorizedOperations) {
-        if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+        if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
           clusterAuthorizedOperations = authorizedOperations(request, Resource.CLUSTER)
         else
           clusterAuthorizedOperations = 0
@@ -1196,14 +1201,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetFetchRequest = request.body[OffsetFetchRequest]
 
     def partitionAuthorized[T](elements: List[T], topic: T => String): (Seq[T], Seq[T]) = {
-      val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, elements.map(topic))
+      val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, elements.map(topic))
       elements.partition(element => authorizedTopics.contains(topic.apply(element)))
     }
 
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
         // reject the request if not authorized to the group
-        if (!authorize(request, DESCRIBE, GROUP, offsetFetchRequest.groupId))
+        if (!authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
         else {
           if (header.apiVersion == 0) {
@@ -1270,10 +1275,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
     if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
-        !authorize(request, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
+        !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
-        !authorize(request, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
+        !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
       // get metadata (and create the topic if necessary)
@@ -1342,7 +1347,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeGroupsResponseData = new DescribeGroupsResponseData()
 
     describeRequest.data.groups.asScala.foreach { groupId =>
-      if (!authorize(request, DESCRIBE, GROUP, groupId)) {
+      if (!authorize(request.context, DESCRIBE, GROUP, groupId)) {
         describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
       } else {
         val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1379,7 +1384,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val (error, groups) = groupCoordinator.handleListGroups()
-    if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+    if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
       // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new ListGroupsResponse(new ListGroupsResponseData()
@@ -1391,7 +1396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setThrottleTimeMs(requestThrottleMs)
         ))
     else {
-      val filteredGroups = groups.filter(group => authorize(request, DESCRIBE, GROUP, group.groupId))
+      val filteredGroups = groups.filter(group => authorize(request.context, DESCRIBE, GROUP, group.groupId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new ListGroupsResponse(new ListGroupsResponseData()
           .setErrorCode(error.code)
@@ -1439,7 +1444,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
       sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
-    } else if (!authorize(request, READ, GROUP, joinGroupRequest.data.groupId)) {
+    } else if (!authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
       sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
       val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)
@@ -1490,7 +1495,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) {
       // Starting from version 5, ProtocolType and ProtocolName fields are mandatory.
       sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
-    } else if (!authorize(request, READ, GROUP, syncGroupRequest.data.groupId)) {
+    } else if (!authorize(request.context, READ, GROUP, syncGroupRequest.data.groupId)) {
       sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
       val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
@@ -1516,7 +1521,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groups = deleteGroupsRequest.data.groupsNames.asScala.toSet
 
     val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
-      authorize(request, DELETE, GROUP, group)
+      authorize(request.context, DELETE, GROUP, group)
     }
 
     val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
@@ -1560,7 +1565,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
       sendResponseCallback(Errors.UNSUPPORTED_VERSION)
-    } else if (!authorize(request, READ, GROUP, heartbeatRequest.data.groupId)) {
+    } else if (!authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new HeartbeatResponse(
             new HeartbeatResponseData()
@@ -1582,7 +1587,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val members = leaveGroupRequest.members.asScala.toList
 
-    if (!authorize(request, READ, GROUP, leaveGroupRequest.data.groupId)) {
+    if (!authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
       sendResponseMaybeThrottle(request, requestThrottleMs => {
         new LeaveGroupResponse(new LeaveGroupResponseData()
           .setThrottleTimeMs(requestThrottleMs)
@@ -1673,11 +1678,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       createTopicsRequest.data.topics.asScala.foreach { topic =>
         results.add(new CreatableTopicResult().setName(topic.name))
       }
-      val hasClusterAuthorization = authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
+      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+        logIfDenied = false)
       val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics = if (hasClusterAuthorization) topics.toSet else filterAuthorized(request, CREATE, TOPIC, topics.toSeq)
-      val authorizedForDescribeConfigs = filterAuthorized(request, DESCRIBE_CONFIGS, TOPIC, topics.toSeq, logIfDenied = false)
-        .map(name => name -> results.find(name)).toMap
+      val authorizedTopics =
+        if (hasClusterAuthorization) topics.toSet
+        else filterAuthorized(request.context, CREATE, TOPIC, topics.toSeq)
+      val authorizedForDescribeConfigs = filterAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+        topics.toSeq, logIfDenied = false).map(name => name -> results.find(name)).toMap
 
       results.asScala.foreach(topic => {
         if (results.findAll(topic.name).size > 1) {
@@ -1753,7 +1761,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         .filter { _._2.size > 1 }
         .keySet
       val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
-      val authorizedTopics = filterAuthorized(request, ALTER, TOPIC, notDuped.map(_.name))
+      val authorizedTopics = filterAuthorized(request.context, ALTER, TOPIC, notDuped.map(_.name))
       val (authorized, unauthorized) = notDuped.partition { topic => authorizedTopics.contains(topic.name) }
 
       val (queuedForDeletion, valid) = authorized.partition { topic =>
@@ -1807,7 +1815,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         results.add(new DeletableTopicResult()
           .setName(topic))
       }
-      val authorizedTopics = filterAuthorized(request, DELETE, TOPIC, results.asScala.toSeq.map(_.name))
+      val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC,
+        results.asScala.toSeq.map(_.name))
       results.asScala.foreach(topic => {
          if (!authorizedTopics.contains(topic.name))
            topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -1845,7 +1854,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsPartitionResult]()
     val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
 
-    val authorizedTopics = filterAuthorized(request, DELETE, TOPIC,
+    val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC,
       deleteRecordsRequest.data.topics.asScala.map(_.name))
     val deleteTopicPartitions = deleteRecordsRequest.data.topics.asScala.flatMap(deleteTopic => {
       deleteTopic.partitions.asScala.map(deletePartition => {
@@ -1910,11 +1919,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     val transactionalId = initProducerIdRequest.data.transactionalId
 
     if (transactionalId != null) {
-      if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) {
+      if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
-    } else if (!authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
       sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
@@ -1951,7 +1960,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.data.transactionalId
 
-    if (authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) {
+    if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val responseBody = new EndTxnResponse(new EndTxnResponseData()
@@ -2093,7 +2102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId))
+    if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else {
@@ -2101,7 +2110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
       val authorizedPartitions = mutable.Set[TopicPartition]()
 
-      val authorizedTopics = filterAuthorized(request, WRITE, TOPIC,
+      val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC,
         partitionsToAdd.map(_.topic).filterNot(org.apache.kafka.common.internals.Topic.isInternal))
       for (topicPartition <- partitionsToAdd) {
         if (!authorizedTopics.contains(topicPartition.topic))
@@ -2148,10 +2157,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = addOffsetsToTxnRequest.consumerGroupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
-    if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId))
+    if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
-    else if (!authorize(request, READ, GROUP, groupId))
+    else if (!authorize(request.context, READ, GROUP, groupId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     else {
@@ -2180,15 +2189,15 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
     // since it is implied by transactionalId authorization
-    if (!authorize(request, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId))
+    if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
-    else if (!authorize(request, READ, GROUP, txnOffsetCommitRequest.data.groupId))
+    else if (!authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
       val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
       val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
       val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
-      val authorizedTopics = filterAuthorized(request, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic))
+      val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic))
 
       for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
         if (!authorizedTopics.contains(topicPartition.topic))
@@ -2358,10 +2367,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     // The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required
     // cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation
     // following a leader change, so we also allow topic describe permission.
-    val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
+    val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
       (requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData])
     } else {
-      val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic))
+      val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic))
       requestInfo.partition {
         case (tp, _) => authorizedTopics.contains(tp.topic)
       }
@@ -2385,9 +2394,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ConfigResource.Type.BROKER_LOGGER =>
           throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
-          authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+          authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request, ALTER_CONFIGS, TOPIC, resource.name)
+          authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2505,9 +2514,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-          authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+          authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request, ALTER_CONFIGS, TOPIC, resource.name)
+          authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2526,9 +2535,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
       resource.`type` match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-          authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
+          authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request, DESCRIBE_CONFIGS, TOPIC, resource.name)
+          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
       }
     }
@@ -2547,7 +2556,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {
     val alterReplicaDirsRequest = request.body[AlterReplicaLogDirsRequest]
     val responseMap = {
-      if (authorize(request, ALTER, CLUSTER, CLUSTER_NAME))
+      if (authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME))
         replicaManager.alterReplicaLogDirs(alterReplicaDirsRequest.partitionDirs.asScala)
       else
         alterReplicaDirsRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -2558,7 +2567,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {
     val describeLogDirsDirRequest = request.body[DescribeLogDirsRequest]
     val logDirInfos = {
-      if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
+      if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
         val partitions =
           if (describeLogDirsDirRequest.isAllTopicPartitions)
             replicaManager.logManager.allLogs.map(_.topicPartition).toSet
@@ -2691,7 +2700,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           None
         else
           Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList)
-        def authorizeToken(tokenId: String) = authorize(request, DESCRIBE, DELEGATION_TOKEN, tokenId)
+        def authorizeToken(tokenId: String) = authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
         def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
         val tokens =  tokenManager.getTokens(eligible)
         sendResponseCallback(Errors.NONE, tokens)
@@ -2758,7 +2767,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       })
     }
 
-    if (!authorize(request, ALTER, CLUSTER, CLUSTER_NAME)) {
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
       val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
       val partitionErrors: Map[TopicPartition, ApiError] =
         electionRequest.topicPartitions.iterator.map(partition => partition -> error).toMap
@@ -2785,8 +2794,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
     val groupId = offsetDeleteRequest.data.groupId
 
-    if (authorize(request, DELETE, GROUP, groupId)) {
-      val authorizedTopics = filterAuthorized(request, READ, TOPIC,
+    if (authorize(request.context, DELETE, GROUP, groupId)) {
+      val authorizedTopics = filterAuthorized(request.context, READ, TOPIC,
         offsetDeleteRequest.data.topics.asScala.map(_.name).toSeq)
 
       val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
@@ -2842,7 +2851,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = {
     val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
 
-    if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+    if (authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
       val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter).map { case (quotaEntity, quotaConfigs) =>
         quotaEntity -> quotaConfigs.map { case (key, value) => key -> Double.box(value) }.asJava
       }.asJava
@@ -2857,7 +2866,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
     val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
 
-    if (authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+    if (authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
       val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries().asScala.toSeq,
         alterClientQuotasRequest.validateOnly()).asJava
       sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -2868,45 +2877,48 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def authorize(request: RequestChannel.Request,
-                        operation: AclOperation,
-                        resourceType: ResourceType,
-                        resourceName: String,
-                        logIfAllowed: Boolean = true,
-                        logIfDenied: Boolean = true,
-                        refCount: Int = 1): Boolean = {
+  // private package for testing
+  private[server] def authorize(requestContext: RequestContext,
+                                operation: AclOperation,
+                                resourceType: ResourceType,
+                                resourceName: String,
+                                logIfAllowed: Boolean = true,
+                                logIfDenied: Boolean = true,
+                                refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
       val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
       val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(request.context, actions).asScala.head == AuthorizationResult.ALLOWED
+      authZ.authorize(requestContext, actions).asScala.head == AuthorizationResult.ALLOWED
     }
   }
 
-  private def filterAuthorized(request: RequestChannel.Request,
-                               operation: AclOperation,
-                               resourceType: ResourceType,
-                               resourceNames: Seq[String],
-                               logIfAllowed: Boolean = true,
-                               logIfDenied: Boolean = true): Set[String] = {
+  // private package for testing
+  private[server] def filterAuthorized(requestContext: RequestContext,
+                                       operation: AclOperation,
+                                       resourceType: ResourceType,
+                                       resourceNames: Seq[String],
+                                       logIfAllowed: Boolean = true,
+                                       logIfDenied: Boolean = true): Set[String] = {
+    val uniqueResourceNames = resourceNames.distinct
     authorizer match {
       case Some(authZ) =>
         val groupedResourceNames = resourceNames.groupBy(identity)
-        val actions = resourceNames.map { resourceName =>
+        val actions = uniqueResourceNames.map { resourceName =>
           val count = groupedResourceNames(resourceName).size
           val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
           new Action(operation, resource, count, logIfAllowed, logIfDenied)
         }
-        authZ.authorize(request.context, actions.asJava).asScala
-          .zip(resourceNames)
+        authZ.authorize(requestContext, actions.asJava).asScala
+          .zip(uniqueResourceNames)
           .filter { case (authzResult, _) => authzResult == AuthorizationResult.ALLOWED }
           .map { case (_, resourceName) => resourceName }.toSet
       case None =>
-        resourceNames.toSet
+        uniqueResourceNames.toSet
     }
   }
 
   private def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
-    if (!authorize(request, operation, CLUSTER, CLUSTER_NAME))
+    if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1b6c328..a4e9145 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -40,6 +40,7 @@ import kafka.network.RequestChannel.SendResponse
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
@@ -59,7 +60,12 @@ import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
+import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.Action
+import org.apache.kafka.server.authorizer.AuthorizationResult
 import org.apache.kafka.server.authorizer.Authorizer
 import org.easymock.EasyMock._
 import org.easymock.{Capture, EasyMock, IAnswer}
@@ -82,7 +88,6 @@ class KafkaApisTest {
   private val metrics = new Metrics()
   private val brokerId = 1
   private val metadataCache = new MetadataCache(brokerId)
-  private val authorizer: Option[Authorizer] = None
   private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
   private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
   private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
@@ -101,7 +106,8 @@ class KafkaApisTest {
     metrics.close()
   }
 
-  def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
+  def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
+                      authorizer: Option[Authorizer] = None): KafkaApis = {
     val properties = TestUtils.createBrokerConfig(brokerId, "zk")
     properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
     properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
@@ -127,6 +133,93 @@ class KafkaApisTest {
   }
 
   @Test
+  def testAuthorize(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.WRITE
+    val resourceType = ResourceType.TOPIC
+    val resourceName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
+      clientId, 0)
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
+
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
+        1, true, true)
+    )
+
+    EasyMock.expect(authorizer.authorize(
+      requestContext, expectedActions.asJava
+    )).andReturn(
+      Seq(AuthorizationResult.ALLOWED).asJava
+    ).once()
+
+    EasyMock.replay(authorizer)
+
+    val result = createKafkaApis(authorizer = Some(authorizer)).authorize(
+      requestContext,
+      operation,
+      resourceType,
+      resourceName,
+    )
+
+    verify(authorizer)
+
+    assertEquals(true, result)
+  }
+
+  @Test
+  def testFilterAuthorized(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.WRITE
+    val resourceType = ResourceType.TOPIC
+    val resourceName1 = "topic-1"
+    val resourceName2 = "topic-2"
+    val resourceName3 = "topic-3"
+    val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
+      clientId, 0)
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
+
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
+        2, true, true),
+      new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
+        1, true, true),
+      new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
+        1, true, true),
+    )
+
+    EasyMock.expect(authorizer.authorize(
+      requestContext, expectedActions.asJava
+    )).andReturn(
+      Seq(
+        AuthorizationResult.ALLOWED,
+        AuthorizationResult.DENIED,
+        AuthorizationResult.ALLOWED
+      ).asJava
+    ).once()
+
+    EasyMock.replay(authorizer)
+
+    val result = createKafkaApis(authorizer = Some(authorizer)).filterAuthorized(
+      requestContext,
+      operation,
+      resourceType,
+      // Duplicate resource names should not trigger multiple calls to authorize
+      Seq(resourceName1, resourceName2, resourceName1, resourceName3)
+    )
+
+    verify(authorizer)
+
+    assertEquals(Set(resourceName1, resourceName3), result)
+  }
+
+  @Test
   def testOffsetCommitWithInvalidPartition(): Unit = {
     val topic = "topic"
     setupBasicMetadataCache(topic, numPartitions = 1)