You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/29 16:34:56 UTC
[kafka] branch 3.3 updated: KAFKA-14265: Prefix ACLs may shadow other prefix ACLs
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 1780f2660e4 KAFKA-14265: Prefix ACLs may shadow other prefix ACLs
1780f2660e4 is described below
commit 1780f2660e4b45ca8895f1614a27613f922aad22
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Sep 28 17:02:04 2022 -0700
KAFKA-14265: Prefix ACLs may shadow other prefix ACLs
---
.../kafka/api/AuthorizerIntegrationTest.scala | 14 +++++-
.../authorizer/StandardAuthorizerData.java | 55 +++++++++++++++++-----
.../authorizer/StandardAuthorizerTest.java | 27 +++++++++++
3 files changed, 83 insertions(+), 13 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a109ae8ce4c..ff1b2f5934d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.waitUntilTrue
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, NewTopic}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
@@ -2619,4 +2619,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
)
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPrefixAcls(quorum: String): Unit = {
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)),
+ new ResourcePattern(TOPIC, "f", PREFIXED))
+ addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, DENY)),
+ new ResourcePattern(TOPIC, "fooa", PREFIXED))
+ addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, ALLOW)),
+ new ResourcePattern(TOPIC, "foob", PREFIXED))
+ createAdminClient().createTopics(Collections.
+ singletonList(new NewTopic("foobar", 1, 1.toShort))).all().get()
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index c6e3b74a2ab..6e9efc3cd5d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -352,8 +352,10 @@ public class StandardAuthorizerData {
// This code relies on the ordering of StandardAcl within the NavigableMap.
// Entries are sorted by resource type first, then REVERSE resource name.
// Therefore, we can find all the applicable ACLs by starting at
- // (resource_type, resource_name) and stepping forwards until we reach an ACL with
- // a resource name which is not a prefix of the current one.
+ // (resource_type, resource_name) and stepping forwards until we reach
+ // an ACL with a resource name which is not a prefix of the current one.
+ // At that point, we need to search for if there are any more ACLs at
+ // the first divergence point.
//
// For example, when trying to authorize a TOPIC resource named foobar, we would
// start at element 2, and continue on to 3 and 4 following map:
@@ -362,9 +364,12 @@ public class StandardAuthorizerData {
// 2. rs=TOPIC rn=foobar pt=PREFIX
// 3. rs=TOPIC rn=foob pt=LITERAL
// 4. rs=TOPIC rn=foo pt=PREFIX
- // 5. rs=TOPIC rn=eeee pt=LITERAL
+ // 5. rs=TOPIC rn=fb pt=PREFIX
+ // 6. rs=TOPIC rn=fa pt=PREFIX
+ // 7. rs=TOPIC rn=f pt=PREFIX
+ // 8. rs=TOPIC rn=eeee pt=LITERAL
//
- // Once we reached element 5, we would stop scanning.
+ // Once we reached element 5, we would jump to element 7.
MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
@@ -394,6 +399,20 @@ public class StandardAuthorizerData {
return matchingAclBuilder.build();
}
+ static int matchesUpTo(
+ String resource,
+ String pattern
+ ) {
+ int i = 0;
+ while (true) {
+ if (resource.length() == i) break;
+ if (pattern.length() == i) break;
+ if (resource.charAt(i) != pattern.charAt(i)) break;
+ i++;
+ }
+ return i;
+ }
+
private void checkSection(
Action action,
StandardAcl exemplar,
@@ -401,28 +420,40 @@ public class StandardAuthorizerData {
String host,
MatchingAclBuilder matchingAclBuilder
) {
- NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
String resourceName = action.resourcePattern().name();
- for (Iterator<StandardAcl> iterator = tailSet.iterator();
- iterator.hasNext(); ) {
+ NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
+ Iterator<StandardAcl> iterator = tailSet.iterator();
+ while (iterator.hasNext()) {
StandardAcl acl = iterator.next();
if (!acl.resourceType().equals(action.resourcePattern().resourceType())) {
// We've stepped outside the section for the resource type we care about and
// should stop scanning.
break;
}
- if (resourceName.startsWith(acl.resourceName())) {
- if (acl.patternType() == LITERAL && !resourceName.equals(acl.resourceName())) {
+ int matchesUpTo = matchesUpTo(resourceName, acl.resourceName());
+ if (matchesUpTo == acl.resourceName().length()) {
+ if (acl.patternType() == LITERAL && matchesUpTo != resourceName.length()) {
// This is a literal ACL whose name is a prefix of the resource name, but
// which doesn't match it exactly. We should skip over this ACL, but keep
// scanning in case there are any relevant PREFIX ACLs.
continue;
}
+
} else if (!(acl.resourceName().equals(WILDCARD) && acl.patternType() == LITERAL)) {
// If the ACL resource name is NOT a prefix of the current resource name,
// and we're not dealing with the special case of a wildcard ACL, we've
- // stepped outside of the section we care about and should stop scanning.
- break;
+ // stepped outside of the section we care about. Scan for any other potential
+ // prefix matches.
+ exemplar = new StandardAcl(exemplar.resourceType(),
+ exemplar.resourceName().substring(0, matchesUpTo),
+ exemplar.patternType(),
+ exemplar.principal(),
+ exemplar.host(),
+ exemplar.operation(),
+ exemplar.permissionType());
+ tailSet = aclsByResource.tailSet(exemplar, true);
+ iterator = tailSet.iterator();
+ continue;
}
AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
if (ALLOWED == result) {
@@ -625,4 +656,4 @@ public class StandardAuthorizerData {
HashMap<Uuid, StandardAcl> getAclsById() {
return aclsById;
}
-}
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 987c00155c4..a26eb3d50b5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -640,4 +640,31 @@ public class StandardAuthorizerTest {
assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally());
}
+
+ @Test
+ public void testPrefixAcls() throws Exception {
+ StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
+ List<StandardAcl> acls = Arrays.asList(
+ new StandardAcl(TOPIC, "fooa", PREFIXED, "User:alice", "*", ALL, ALLOW),
+ new StandardAcl(TOPIC, "foobar", LITERAL, "User:bob", "*", ALL, ALLOW),
+ new StandardAcl(TOPIC, "f", PREFIXED, "User:bob", "*", ALL, ALLOW)
+ );
+ acls.forEach(acl -> {
+ StandardAclWithId aclWithId = withId(acl);
+ authorizer.addAcl(aclWithId.id(), aclWithId.acl());
+ });
+ assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
+ newRequestContext("bob"),
+ Arrays.asList(
+ newAction(WRITE, TOPIC, "foobarr"),
+ newAction(READ, TOPIC, "goobar"),
+ newAction(READ, TOPIC, "fooa"))));
+
+ assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
+ newRequestContext("alice"),
+ Arrays.asList(
+ newAction(DESCRIBE, TOPIC, "fooa"),
+ newAction(WRITE, TOPIC, "bar"),
+ newAction(READ, TOPIC, "baz"))));
+ }
}