You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/12/08 19:27:07 UTC

[kafka] branch 3.3 updated: KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 4154a1ca4cc KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
4154a1ca4cc is described below

commit 4154a1ca4cc154a020869f423be6ea08cfa98e3c
Author: Purshotam Chauhan <pc...@confluent.io>
AuthorDate: Tue Dec 6 00:13:04 2022 +0530

    KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Colin Patrick McCabe <cm...@apache.org>
---
 .../authorizer/StandardAuthorizerData.java         | 58 +++++++++++----------
 .../authorizer/StandardAuthorizerTest.java         | 60 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 28 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 6e9efc3cd5d..f456ff4d772 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -105,7 +105,7 @@ public class StandardAuthorizerData {
     /**
      * The result to return if no ACLs match.
      */
-    private final DefaultRule defaultRule;
+    private final DefaultRule noAclRule;
 
     /**
      * Contains all of the current ACLs sorted by (resource type, resource name).
@@ -146,7 +146,7 @@ public class StandardAuthorizerData {
         this.aclMutator = aclMutator;
         this.loadingComplete = loadingComplete;
         this.superUsers = superUsers;
-        this.defaultRule = new DefaultRule(defaultResult);
+        this.noAclRule = new DefaultRule(defaultResult);
         this.aclsByResource = aclsByResource;
         this.aclsById = aclsById;
     }
@@ -157,7 +157,7 @@ public class StandardAuthorizerData {
             newAclMutator,
             loadingComplete,
             superUsers,
-            defaultRule.result,
+            noAclRule.result,
             aclsByResource,
             aclsById);
     }
@@ -167,7 +167,7 @@ public class StandardAuthorizerData {
             aclMutator,
             newLoadingComplete,
             superUsers,
-            defaultRule.result,
+            noAclRule.result,
             aclsByResource,
             aclsById);
     }
@@ -192,7 +192,7 @@ public class StandardAuthorizerData {
             aclMutator,
             loadingComplete,
             superUsers,
-            defaultRule.result,
+            noAclRule.result,
             aclsByResource,
             aclsById);
         log.info("Initialized with {} acl(s).", aclsById.size());
@@ -239,7 +239,7 @@ public class StandardAuthorizerData {
     }
 
     AuthorizationResult defaultResult() {
-        return defaultRule.result;
+        return noAclRule.result;
     }
 
     int aclCount() {
@@ -268,20 +268,12 @@ public class StandardAuthorizerData {
         } else if (!loadingComplete) {
             throw new AuthorizerNotReadyException();
         } else {
-            MatchingAclRule aclRule = findAclRule(
+            rule = findAclRule(
                 matchingPrincipals(requestContext),
                 requestContext.clientAddress().getHostAddress(),
                 action
             );
-
-            if (aclRule != null) {
-                rule = aclRule;
-            } else {
-                // If nothing matched, we return the default result.
-                rule = defaultRule;
-            }
         }
-
         logAuditMessage(principal, requestContext, action, rule);
         return rule.result();
     }
@@ -344,7 +336,7 @@ public class StandardAuthorizerData {
         }
     }
 
-    private MatchingAclRule findAclRule(
+    private MatchingRule findAclRule(
         Set<KafkaPrincipal> matchingPrincipals,
         String host,
         Action action
@@ -370,7 +362,7 @@ public class StandardAuthorizerData {
         // 8. rs=TOPIC rn=eeee pt=LITERAL
         //
         // Once we reached element 5, we would jump to element 7.
-        MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
+        MatchingRuleBuilder matchingRuleBuilder = new MatchingRuleBuilder(noAclRule);
         StandardAcl exemplar = new StandardAcl(
             action.resourcePattern().resourceType(),
             action.resourcePattern().name(),
@@ -379,9 +371,9 @@ public class StandardAuthorizerData {
             "",
             AclOperation.UNKNOWN,
             AclPermissionType.UNKNOWN);
-        checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
-        if (matchingAclBuilder.foundDeny()) {
-            return matchingAclBuilder.build();
+        checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
+        if (matchingRuleBuilder.foundDeny()) {
+            return matchingRuleBuilder.build();
         }
 
         // In addition to ACLs for this specific resource name, there can also be wildcard
@@ -395,8 +387,8 @@ public class StandardAuthorizerData {
             "",
             AclOperation.UNKNOWN,
             AclPermissionType.UNKNOWN);
-        checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
-        return matchingAclBuilder.build();
+        checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
+        return matchingRuleBuilder.build();
     }
 
     static int matchesUpTo(
@@ -418,7 +410,7 @@ public class StandardAuthorizerData {
         StandardAcl exemplar,
         Set<KafkaPrincipal> matchingPrincipals,
         String host,
-        MatchingAclBuilder matchingAclBuilder
+        MatchingRuleBuilder matchingRuleBuilder
     ) {
         String resourceName = action.resourcePattern().name();
         NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
@@ -455,11 +447,12 @@ public class StandardAuthorizerData {
                 iterator = tailSet.iterator();
                 continue;
             }
+            matchingRuleBuilder.hasResourceAcls = true;
             AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
             if (ALLOWED == result) {
-                matchingAclBuilder.allowAcl = acl;
+                matchingRuleBuilder.allowAcl = acl;
             } else if (DENIED == result) {
-                matchingAclBuilder.denyAcl = acl;
+                matchingRuleBuilder.denyAcl = acl;
                 return;
             }
         }
@@ -630,21 +623,30 @@ public class StandardAuthorizerData {
         }
     }
 
-    private static class MatchingAclBuilder {
+    private static class MatchingRuleBuilder {
+        private static final DefaultRule DENY_RULE = new DefaultRule(DENIED);
+        private final DefaultRule noAclRule;
         private StandardAcl denyAcl;
         private StandardAcl allowAcl;
+        private boolean hasResourceAcls;
+
+        public MatchingRuleBuilder(DefaultRule noAclRule) {
+            this.noAclRule = noAclRule;
+        }
 
         boolean foundDeny() {
             return denyAcl != null;
         }
 
-        MatchingAclRule build() {
+        MatchingRule build() {
             if (denyAcl != null) {
                 return new MatchingAclRule(denyAcl, DENIED);
             } else if (allowAcl != null) {
                 return new MatchingAclRule(allowAcl, ALLOWED);
+            } else if (!hasResourceAcls) {
+                return noAclRule;
             } else {
-                return null;
+                return DENY_RULE;
             }
         }
     }
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index a26eb3d50b5..3966054e744 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -167,6 +167,66 @@ public class StandardAuthorizerTest {
             ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false")));
     }
 
+    @Test
+    public void testAllowEveryoneIfNoAclFoundConfigEnabled() throws Exception {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        HashMap<String, Object> configs = new HashMap<>();
+        configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
+        configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
+        authorizer.configure(configs);
+        authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+        authorizer.completeInitialLoad();
+
+        List<StandardAclWithId> acls = singletonList(
+                withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
+        );
+        acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
+        assertEquals(singletonList(DENIED),
+                authorizer.authorize(
+                        new MockAuthorizableRequestContext.Builder()
+                                .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+                                .build(),
+                        singletonList(newAction(READ, TOPIC, "topic1"))
+                ));
+        assertEquals(singletonList(ALLOWED),
+                authorizer.authorize(
+                        new MockAuthorizableRequestContext.Builder()
+                                .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+                                .build(),
+                        singletonList(newAction(READ, TOPIC, "topic2"))
+                ));
+    }
+
+    @Test
+    public void testAllowEveryoneIfNoAclFoundConfigDisabled() throws Exception {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        HashMap<String, Object> configs = new HashMap<>();
+        configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
+        configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
+        authorizer.configure(configs);
+        authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+        authorizer.completeInitialLoad();
+
+        List<StandardAclWithId> acls = singletonList(
+                withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
+        );
+        acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
+        assertEquals(singletonList(DENIED),
+                authorizer.authorize(
+                        new MockAuthorizableRequestContext.Builder()
+                                .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+                                .build(),
+                        singletonList(newAction(READ, TOPIC, "topic1"))
+                ));
+        assertEquals(singletonList(DENIED),
+                authorizer.authorize(
+                        new MockAuthorizableRequestContext.Builder()
+                                .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+                                .build(),
+                        singletonList(newAction(READ, TOPIC, "topic2"))
+                ));
+    }
+
     @Test
     public void testConfigure() {
         StandardAuthorizer authorizer = new StandardAuthorizer();