You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/01 21:18:43 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533510477



##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType eq ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType eq ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op eq AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op eq AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")

Review comment:
       We should probably move this common code to SecurityUtils and use it both here and in the default implementation.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType eq ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType eq ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op eq AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op eq AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+
+    val denyPatterns = matchingPatterns(
+      principal,
+      requestContext.clientAddress().getHostAddress,
+      op,
+      resourceType,
+      AclPermissionType.DENY
+    )
+
+    if (denyAll(denyPatterns)) {
+      logAuditMessage(requestContext, new Action(op, null,0, true, true), false, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, new Action(op, null, 0, true, true), true, false)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowPatterns = matchingPatterns(
+      principal,
+      requestContext.clientAddress().getHostAddress,
+      op,
+      resourceType,
+      AclPermissionType.ALLOW
+    )
+
+    if (allowAny(allowPatterns, denyPatterns)) {
+      logAuditMessage(requestContext, new Action(op,null, 0, true, true), true, false)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, new Action(op, null, 0, true, true), false, false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingPatterns(principal: String, host: String, op: AclOperation,
+                       resourceType: ResourceType,
+                       permission: AclPermissionType): Set[ResourcePattern] = {
+    var resources = Set[ResourcePattern]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val ace = new AccessControlEntry(p, h, o, permission)
+          resourceCache.get(ace) match {
+            case Some(r) => resources ++= r.filter(r => r.resourceType() == resourceType)
+            case None =>
+          }
+        }
+      }
+    }
+    resources
+  }
+
+  private def denyAll(denyResources: Set[ResourcePattern]): Boolean =
+    denyResources.exists(rp => denyAll(rp))
+
+  private def denyAll(rp: ResourcePattern): Boolean =
+    rp.patternType() == PatternType.LITERAL && rp.name() == ResourcePattern.WILDCARD_RESOURCE
+
+  private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns: Set[ResourcePattern]): Boolean =
+    allowPatterns.exists(pattern => allow(pattern, denyPatterns))
+
+  private def allow(pattern: ResourcePattern, denyPatterns: Set[ResourcePattern]): Boolean = {

Review comment:
       ok, I seem to have forgotten this. Why is this code different from the one in the default implementation?

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
+     *                       given ACL operation on at least one resource of the given type.
+     *                       Return {@link AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;
+
+        List<Set<String>> deny = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+        List<Set<String>> allow = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+        boolean hasWildCardAllow = false;
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+                    && !binding.entry().host().equals("*"))
+                continue;
+
+            KafkaPrincipal principal = new KafkaPrincipal(
+                requestContext.principal().getPrincipalType(),
+                requestContext.principal().getName());

Review comment:
       Why is this inside the for loop? We could just create one principal and use it inside the loop.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
+     *                       given ACL operation on at least one resource of the given type.
+     *                       Return {@link AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;
+
+        List<Set<String>> deny = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+        List<Set<String>> allow = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+        boolean hasWildCardAllow = false;
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())

Review comment:
       We could get host address and store in a variable outside the loop.

##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-    @Param({"10000", "50000", "200000"})
+    @Param({"10000", "40000", "80000"})

Review comment:
       I am not sure why we would make this change. If we need the change because we have become slower, we need to understand why.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2109,7 +2104,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
-    } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false)
+        || !authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {

Review comment:
       Should this be `&&` since we we only need one?

##########
File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.{lang, util}
+import java.util.concurrent.CompletionStage
+
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
+import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo}
+
+object MockAuthorizer {
+    val authorizer = new AclAuthorizer
+}
+
+/**
+ * A mock authorizer for testing the interface default
+ */
+class MockAuthorizer extends Authorizer {

Review comment:
       This should perhaps be called DelegatingAuthorizer rather than MockAuthorizer since it is not a mock and requires ZK.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
+     *                       given ACL operation on at least one resource of the given type.
+     *                       Return {@link AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;
+
+        List<Set<String>> deny = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));

Review comment:
       Why do we create ArrayList(Arrays.asList)?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -456,8 +576,12 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation " +
-        s"from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
+      if (byResourceType)
+        s"Principal = $principal is $authResult Operation = $operation " +
+          s"from host = $host on at least one resource of type $resourceType for request = $apiKey"

Review comment:
       We should try to preserve the format for this for compatibility with scripts that parse these logs.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -550,6 +660,31 @@ class AclAuthorizer extends Authorizer with Logging {
   }
 
   private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
+    val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {

Review comment:
       Do we have a benchmark for updates (not authorize)?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -130,6 +130,11 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
+
+  @volatile
+  private var resourceCache = new scala.collection.immutable.HashMap[AccessControlEntry,
+    scala.collection.mutable.HashSet[ResourcePattern]]()

Review comment:
       I wasn't sure what the result shows (not that familiar with the output format, sorry) The useful comparisons would be:
   1) For authorizeByResourceType, what is the performance advantage we get by using this duplicate cache versus just using `aclCache`.
   2) What is the impact on updates which hold a lock for maintaining two caches (without the PR vs with this PR)
   3) Does this PR impact regular authorize() calls? I think the answer is no.
   
   In any case, it seems unnecessary to maintain a second cache with all ACLs. We never use authorizeByResourceType for anything other than topics, so it seems a waste to store ACLs for other resource types here. We could just use `super.authorizeByResourceType` for other types.

##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -115,45 +122,62 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex
 
             Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
 
-            for (int aclId = 0; aclId < aclCount; aclId++) {
-                AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId,
-                    "*", AclOperation.READ, AclPermissionType.ALLOW);
-                entries.add(new AclEntry(ace));
+            for (int aclId = 0; aclId < aclCount / 2; aclId++) {
+                String acePrinciple = principal.toString() + (aclId == 0 ? "" : aclId);
+                AccessControlEntry allowAce = new AccessControlEntry(
+                    acePrinciple,

Review comment:
       spelling: principal

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
+     *                       given ACL operation on at least one resource of the given type.
+     *                       Return {@link AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;

Review comment:
       This looks odd, do we really need these to index into arrays?

##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -115,45 +122,62 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex
 
             Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
 
-            for (int aclId = 0; aclId < aclCount; aclId++) {
-                AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId,
-                    "*", AclOperation.READ, AclPermissionType.ALLOW);
-                entries.add(new AclEntry(ace));
+            for (int aclId = 0; aclId < aclCount / 2; aclId++) {
+                String acePrinciple = principal.toString() + (aclId == 0 ? "" : aclId);
+                AccessControlEntry allowAce = new AccessControlEntry(
+                    acePrinciple,
+                    "*", AclOperation.WRITE, AclPermissionType.ALLOW);
+                AccessControlEntry denyAce = new AccessControlEntry(
+                    acePrinciple,
+                    "*", AclOperation.WRITE, AclPermissionType.DENY);

Review comment:
       We probably want to retain the old benchmark as-is and add a different one for `authorizeByResourceType`. We were testing a common pattern before, but now we seem to be testing a very unlikely scenario. While this may be useful for testing `authorizeByResourceType`, it is not what we want for regression testing the authorizer.

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType == ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType == ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op == AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op == AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, null, PatternType.ANY)
+    val accessControlEntry = new AccessControlEntryFilter(
+      null, null, null, AclPermissionType.DENY)
+    val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry)
+
+    for (binding <- acls(aclFilter).asScala) {
+      if (aceMatched(requestContext, op, binding) && canDenyAll(binding.pattern()))
+        return true
+    }
+    false
+  }
+
+  @inline
+  private def aceMatched(requestContext: AuthorizableRequestContext,
+                 op: AclOperation,
+                 binding: AclBinding): Boolean = {
+    (hostMatched(requestContext, binding) && principleMatched(requestContext, binding)
+      && operationMatched(op, binding))
+  }
+
+  @inline
+  private def hostMatched(requestContext: AuthorizableRequestContext,
+                  binding: AclBinding): Boolean =
+    (binding.entry().host().equals(requestContext.clientAddress().getHostAddress)
+      || binding.entry().host().equals(AclEntry.WildcardHost))
+
+  @inline
+  private def principleMatched(requestContext: AuthorizableRequestContext,

Review comment:
       spelling: principal (multiple places)

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
+     *                       given ACL operation on at least one resource of the given type.
+     *                       Return {@link AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;
+
+        List<Set<String>> deny = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+        List<Set<String>> allow = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+        boolean hasWildCardAllow = false;
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+                    && !binding.entry().host().equals("*"))
+                continue;
+
+            KafkaPrincipal principal = new KafkaPrincipal(
+                requestContext.principal().getPrincipalType(),
+                requestContext.principal().getName());
+
+            if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+                    && !binding.entry().principal().equals("User:*"))
+                continue;
+
+            if (binding.entry().operation() != op
+                    && binding.entry().operation() != AclOperation.ALL)
+                continue;
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                switch (binding.pattern().patternType()) {
+                    case LITERAL:
+                        if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+                            return AuthorizationResult.DENIED;
+                        deny.get(typeLiteral).add(binding.pattern().name());
+                        break;
+                    case PREFIXED:
+                        deny.get(typePrefix).add(binding.pattern().name());
+                        break;
+                }
+                continue;
+            }
+
+            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+                continue;
+
+            switch (binding.pattern().patternType()) {
+                case LITERAL:
+                    if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                        hasWildCardAllow = true;
+                        continue;
+                    }
+                    allow.get(typeLiteral).add(binding.pattern().name());
+                    break;
+                case PREFIXED:
+                    allow.get(typePrefix).add(binding.pattern().name());
+                    break;
+            }
+        }
+
+        if (hasWildCardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        for (int allowType : Arrays.asList(typePrefix, typeLiteral)) {

Review comment:
       An EnumMap may be neater.

##########
File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -82,16 +86,17 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   override def setUp(): Unit = {
     super.setUp()
 
+    val authorizers = Seq(aclAuthorizer, aclAuthorizer2, interfaceDefaultAuthorizer.authorizer)

Review comment:
       Can we move testing of `interfaceDefaultAuthorizer.authorizer` into another class? This is `AclAuthorizerTest` and testing of `interfaceDefaultAuthorizer` seems unrelated to this test. 

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType eq ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType eq ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op eq AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op eq AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+
+    val denyPatterns = matchingPatterns(
+      principal,
+      requestContext.clientAddress().getHostAddress,
+      op,
+      resourceType,
+      AclPermissionType.DENY
+    )
+
+    if (denyAll(denyPatterns)) {
+      logAuditMessage(requestContext, new Action(op, null,0, true, true), false, false)

Review comment:
       We have lost the resource type for auditing, we should include a resource pattern with empty name or something.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType eq ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType eq ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op eq AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op eq AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")

Review comment:
       We should probably move this common code to SecurityUtils and use it both here and in the default implementation.

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType == ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType == ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op == AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op == AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, null, PatternType.ANY)
+    val accessControlEntry = new AccessControlEntryFilter(
+      null, null, null, AclPermissionType.DENY)
+    val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry)
+
+    for (binding <- acls(aclFilter).asScala) {
+      if (aceMatched(requestContext, op, binding) && canDenyAll(binding.pattern()))
+        return true
+    }
+    false
+  }
+
+  @inline
+  private def aceMatched(requestContext: AuthorizableRequestContext,
+                 op: AclOperation,
+                 binding: AclBinding): Boolean = {
+    (hostMatched(requestContext, binding) && principleMatched(requestContext, binding)
+      && operationMatched(op, binding))
+  }
+
+  @inline
+  private def hostMatched(requestContext: AuthorizableRequestContext,
+                  binding: AclBinding): Boolean =
+    (binding.entry().host().equals(requestContext.clientAddress().getHostAddress)
+      || binding.entry().host().equals(AclEntry.WildcardHost))
+
+  @inline
+  private def principleMatched(requestContext: AuthorizableRequestContext,
+                  binding: AclBinding): Boolean =
+    (binding.entry().principal().equals(requestContext.principal().toString)
+      || binding.entry().principal().equals(AclEntry.WildcardPrincipal.toString))
+
+  @inline
+  private def operationMatched(op: AclOperation,
+                       binding: AclBinding): Boolean =
+    (binding.entry().operation() == op
+      || binding.entry().operation() == AclOperation.ALL)
+
+  @inline
+  private def canDenyAll(pattern: ResourcePattern): Boolean =
+    pattern.patternType() == PatternType.LITERAL && pattern.name().equals(ResourcePattern.WILDCARD_RESOURCE)
+

Review comment:
       Looks like a lot of duplicate code here. We should see how to share code for all this. Can we move the default implementation into SecurityUtils and share some of the matching implementation across the classes?

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): AuthorizationResult = {
+    if (resourceType == ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType")
+
+    if (resourceType == ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op == AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType")
+
+    if (op == AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) {

Review comment:
       if `denyAllResource` is true, we can just return DENIED?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org