You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/29 16:34:56 UTC

[kafka] branch 3.3 updated: KAFKA-14265: Prefix ACLs may shadow other prefix ACLs

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 1780f2660e4 KAFKA-14265: Prefix ACLs may shadow other prefix ACLs
1780f2660e4 is described below

commit 1780f2660e4b45ca8895f1614a27613f922aad22
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Sep 28 17:02:04 2022 -0700

    KAFKA-14265: Prefix ACLs may shadow other prefix ACLs
---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 14 +++++-
 .../authorizer/StandardAuthorizerData.java         | 55 +++++++++++++++++-----
 .../authorizer/StandardAuthorizerTest.java         | 27 +++++++++++
 3 files changed, 83 insertions(+), 13 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a109ae8ce4c..ff1b2f5934d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.security.authorizer.AclEntry.WildcardHost
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils.waitUntilTrue
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
@@ -2619,4 +2619,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     )
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPrefixAcls(quorum: String): Unit = {
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)),
+      new ResourcePattern(TOPIC, "f", PREFIXED))
+    addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, DENY)),
+      new ResourcePattern(TOPIC, "fooa", PREFIXED))
+    addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, ALLOW)),
+      new ResourcePattern(TOPIC, "foob", PREFIXED))
+    createAdminClient().createTopics(Collections.
+      singletonList(new NewTopic("foobar", 1, 1.toShort))).all().get()
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index c6e3b74a2ab..6e9efc3cd5d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -352,8 +352,10 @@ public class StandardAuthorizerData {
         // This code relies on the ordering of StandardAcl within the NavigableMap.
         // Entries are sorted by resource type first, then REVERSE resource name.
         // Therefore, we can find all the applicable ACLs by starting at
-        // (resource_type, resource_name) and stepping forwards until we reach an ACL with
-        // a resource name which is not a prefix of the current one.
+        // (resource_type, resource_name) and stepping forwards until we reach
+        // an ACL with a resource name which is not a prefix of the current one.
+        // At that point, we need to search for if there are any more ACLs at
+        // the first divergence point.
         //
         // For example, when trying to authorize a TOPIC resource named foobar, we would
         // start at element 2, and continue on to 3 and 4 following map:
@@ -362,9 +364,12 @@ public class StandardAuthorizerData {
         // 2. rs=TOPIC rn=foobar pt=PREFIX
         // 3. rs=TOPIC rn=foob pt=LITERAL
         // 4. rs=TOPIC rn=foo pt=PREFIX
-        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        // 5. rs=TOPIC rn=fb pt=PREFIX
+        // 6. rs=TOPIC rn=fa pt=PREFIX
+        // 7. rs=TOPIC rn=f  pt=PREFIX
+        // 8. rs=TOPIC rn=eeee pt=LITERAL
         //
-        // Once we reached element 5, we would stop scanning.
+        // Once we reached element 5, we would jump to element 7.
         MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
         StandardAcl exemplar = new StandardAcl(
             action.resourcePattern().resourceType(),
@@ -394,6 +399,20 @@ public class StandardAuthorizerData {
         return matchingAclBuilder.build();
     }
 
+    static int matchesUpTo(
+        String resource,
+        String pattern
+    ) {
+        int i = 0;
+        while (true) {
+            if (resource.length() == i) break;
+            if (pattern.length() == i) break;
+            if (resource.charAt(i) != pattern.charAt(i)) break;
+            i++;
+        }
+        return i;
+    }
+
     private void checkSection(
         Action action,
         StandardAcl exemplar,
@@ -401,28 +420,40 @@ public class StandardAuthorizerData {
         String host,
         MatchingAclBuilder matchingAclBuilder
     ) {
-        NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
         String resourceName = action.resourcePattern().name();
-        for (Iterator<StandardAcl> iterator = tailSet.iterator();
-             iterator.hasNext(); ) {
+        NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
+        Iterator<StandardAcl> iterator = tailSet.iterator();
+        while (iterator.hasNext()) {
             StandardAcl acl = iterator.next();
             if (!acl.resourceType().equals(action.resourcePattern().resourceType())) {
                 // We've stepped outside the section for the resource type we care about and
                 // should stop scanning.
                 break;
             }
-            if (resourceName.startsWith(acl.resourceName())) {
-                if (acl.patternType() == LITERAL && !resourceName.equals(acl.resourceName())) {
+            int matchesUpTo = matchesUpTo(resourceName, acl.resourceName());
+            if (matchesUpTo == acl.resourceName().length()) {
+                if (acl.patternType() == LITERAL && matchesUpTo != resourceName.length()) {
                     // This is a literal ACL whose name is a prefix of the resource name, but
                     // which doesn't match it exactly. We should skip over this ACL, but keep
                     // scanning in case there are any relevant PREFIX ACLs.
                     continue;
                 }
+
             } else if (!(acl.resourceName().equals(WILDCARD) && acl.patternType() == LITERAL)) {
                 // If the ACL resource name is NOT a prefix of the current resource name,
                 // and we're not dealing with the special case of a wildcard ACL, we've
-                // stepped outside of the section we care about and should stop scanning.
-                break;
+                // stepped outside of the section we care about. Scan for any other potential
+                // prefix matches.
+                exemplar = new StandardAcl(exemplar.resourceType(),
+                    exemplar.resourceName().substring(0, matchesUpTo),
+                    exemplar.patternType(),
+                    exemplar.principal(),
+                    exemplar.host(),
+                    exemplar.operation(),
+                    exemplar.permissionType());
+                tailSet = aclsByResource.tailSet(exemplar, true);
+                iterator = tailSet.iterator();
+                continue;
             }
             AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
             if (ALLOWED == result) {
@@ -625,4 +656,4 @@ public class StandardAuthorizerData {
     HashMap<Uuid, StandardAcl> getAclsById() {
         return aclsById;
     }
-}
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 987c00155c4..a26eb3d50b5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -640,4 +640,31 @@ public class StandardAuthorizerTest {
         assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
         assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally());
     }
+
+    @Test
+    public void testPrefixAcls() throws Exception {
+        StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
+        List<StandardAcl> acls = Arrays.asList(
+                new StandardAcl(TOPIC, "fooa", PREFIXED, "User:alice", "*", ALL, ALLOW),
+                new StandardAcl(TOPIC, "foobar", LITERAL, "User:bob", "*", ALL, ALLOW),
+                new StandardAcl(TOPIC, "f", PREFIXED, "User:bob", "*", ALL, ALLOW)
+        );
+        acls.forEach(acl -> {
+            StandardAclWithId aclWithId = withId(acl);
+            authorizer.addAcl(aclWithId.id(), aclWithId.acl());
+        });
+        assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
+                newRequestContext("bob"),
+                Arrays.asList(
+                        newAction(WRITE, TOPIC, "foobarr"),
+                        newAction(READ, TOPIC, "goobar"),
+                        newAction(READ, TOPIC, "fooa"))));
+
+        assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
+                newRequestContext("alice"),
+                Arrays.asList(
+                        newAction(DESCRIBE, TOPIC, "fooa"),
+                        newAction(WRITE, TOPIC, "bar"),
+                        newAction(READ, TOPIC, "baz"))));
+    }
 }