You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/12/08 19:27:07 UTC
[kafka] branch 3.3 updated: KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 4154a1ca4cc KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
4154a1ca4cc is described below
commit 4154a1ca4cc154a020869f423be6ea08cfa98e3c
Author: Purshotam Chauhan <pc...@confluent.io>
AuthorDate: Tue Dec 6 00:13:04 2022 +0530
KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
Reviewers: Manikumar Reddy <ma...@gmail.com>, Colin Patrick McCabe <cm...@apache.org>
---
.../authorizer/StandardAuthorizerData.java | 58 +++++++++++----------
.../authorizer/StandardAuthorizerTest.java | 60 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 28 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 6e9efc3cd5d..f456ff4d772 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -105,7 +105,7 @@ public class StandardAuthorizerData {
/**
* The result to return if no ACLs match.
*/
- private final DefaultRule defaultRule;
+ private final DefaultRule noAclRule;
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
@@ -146,7 +146,7 @@ public class StandardAuthorizerData {
this.aclMutator = aclMutator;
this.loadingComplete = loadingComplete;
this.superUsers = superUsers;
- this.defaultRule = new DefaultRule(defaultResult);
+ this.noAclRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}
@@ -157,7 +157,7 @@ public class StandardAuthorizerData {
newAclMutator,
loadingComplete,
superUsers,
- defaultRule.result,
+ noAclRule.result,
aclsByResource,
aclsById);
}
@@ -167,7 +167,7 @@ public class StandardAuthorizerData {
aclMutator,
newLoadingComplete,
superUsers,
- defaultRule.result,
+ noAclRule.result,
aclsByResource,
aclsById);
}
@@ -192,7 +192,7 @@ public class StandardAuthorizerData {
aclMutator,
loadingComplete,
superUsers,
- defaultRule.result,
+ noAclRule.result,
aclsByResource,
aclsById);
log.info("Initialized with {} acl(s).", aclsById.size());
@@ -239,7 +239,7 @@ public class StandardAuthorizerData {
}
AuthorizationResult defaultResult() {
- return defaultRule.result;
+ return noAclRule.result;
}
int aclCount() {
@@ -268,20 +268,12 @@ public class StandardAuthorizerData {
} else if (!loadingComplete) {
throw new AuthorizerNotReadyException();
} else {
- MatchingAclRule aclRule = findAclRule(
+ rule = findAclRule(
matchingPrincipals(requestContext),
requestContext.clientAddress().getHostAddress(),
action
);
-
- if (aclRule != null) {
- rule = aclRule;
- } else {
- // If nothing matched, we return the default result.
- rule = defaultRule;
- }
}
-
logAuditMessage(principal, requestContext, action, rule);
return rule.result();
}
@@ -344,7 +336,7 @@ public class StandardAuthorizerData {
}
}
- private MatchingAclRule findAclRule(
+ private MatchingRule findAclRule(
Set<KafkaPrincipal> matchingPrincipals,
String host,
Action action
@@ -370,7 +362,7 @@ public class StandardAuthorizerData {
// 8. rs=TOPIC rn=eeee pt=LITERAL
//
// Once we reached element 5, we would jump to element 7.
- MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
+ MatchingRuleBuilder matchingRuleBuilder = new MatchingRuleBuilder(noAclRule);
StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
action.resourcePattern().name(),
@@ -379,9 +371,9 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
- checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
- if (matchingAclBuilder.foundDeny()) {
- return matchingAclBuilder.build();
+ checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
+ if (matchingRuleBuilder.foundDeny()) {
+ return matchingRuleBuilder.build();
}
// In addition to ACLs for this specific resource name, there can also be wildcard
@@ -395,8 +387,8 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
- checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
- return matchingAclBuilder.build();
+ checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
+ return matchingRuleBuilder.build();
}
static int matchesUpTo(
@@ -418,7 +410,7 @@ public class StandardAuthorizerData {
StandardAcl exemplar,
Set<KafkaPrincipal> matchingPrincipals,
String host,
- MatchingAclBuilder matchingAclBuilder
+ MatchingRuleBuilder matchingRuleBuilder
) {
String resourceName = action.resourcePattern().name();
NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
@@ -455,11 +447,12 @@ public class StandardAuthorizerData {
iterator = tailSet.iterator();
continue;
}
+ matchingRuleBuilder.hasResourceAcls = true;
AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
if (ALLOWED == result) {
- matchingAclBuilder.allowAcl = acl;
+ matchingRuleBuilder.allowAcl = acl;
} else if (DENIED == result) {
- matchingAclBuilder.denyAcl = acl;
+ matchingRuleBuilder.denyAcl = acl;
return;
}
}
@@ -630,21 +623,30 @@ public class StandardAuthorizerData {
}
}
- private static class MatchingAclBuilder {
+ private static class MatchingRuleBuilder {
+ private static final DefaultRule DENY_RULE = new DefaultRule(DENIED);
+ private final DefaultRule noAclRule;
private StandardAcl denyAcl;
private StandardAcl allowAcl;
+ private boolean hasResourceAcls;
+
+ public MatchingRuleBuilder(DefaultRule noAclRule) {
+ this.noAclRule = noAclRule;
+ }
boolean foundDeny() {
return denyAcl != null;
}
- MatchingAclRule build() {
+ MatchingRule build() {
if (denyAcl != null) {
return new MatchingAclRule(denyAcl, DENIED);
} else if (allowAcl != null) {
return new MatchingAclRule(allowAcl, ALLOWED);
+ } else if (!hasResourceAcls) {
+ return noAclRule;
} else {
- return null;
+ return DENY_RULE;
}
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index a26eb3d50b5..3966054e744 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -167,6 +167,66 @@ public class StandardAuthorizerTest {
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false")));
}
+ @Test
+ public void testAllowEveryoneIfNoAclFoundConfigEnabled() throws Exception {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ HashMap<String, Object> configs = new HashMap<>();
+ configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
+ configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
+ authorizer.configure(configs);
+ authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+ authorizer.completeInitialLoad();
+
+ List<StandardAclWithId> acls = singletonList(
+ withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
+ );
+ acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
+ assertEquals(singletonList(DENIED),
+ authorizer.authorize(
+ new MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+ .build(),
+ singletonList(newAction(READ, TOPIC, "topic1"))
+ ));
+ assertEquals(singletonList(ALLOWED),
+ authorizer.authorize(
+ new MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+ .build(),
+ singletonList(newAction(READ, TOPIC, "topic2"))
+ ));
+ }
+
+ @Test
+ public void testAllowEveryoneIfNoAclFoundConfigDisabled() throws Exception {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ HashMap<String, Object> configs = new HashMap<>();
+ configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
+ configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
+ authorizer.configure(configs);
+ authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+ authorizer.completeInitialLoad();
+
+ List<StandardAclWithId> acls = singletonList(
+ withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
+ );
+ acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
+ assertEquals(singletonList(DENIED),
+ authorizer.authorize(
+ new MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+ .build(),
+ singletonList(newAction(READ, TOPIC, "topic1"))
+ ));
+ assertEquals(singletonList(DENIED),
+ authorizer.authorize(
+ new MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
+ .build(),
+ singletonList(newAction(READ, TOPIC, "topic2"))
+ ));
+ }
+
@Test
public void testConfigure() {
StandardAuthorizer authorizer = new StandardAuthorizer();