You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/25 22:46:14 UTC
[kafka] branch trunk updated: KAFKA-6591;
Move super user check before ACL matching (#4618)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3cdc78e KAFKA-6591; Move super user check before ACL matching (#4618)
3cdc78e is described below
commit 3cdc78e6bb1f83973a14ce1550fe3874f7348b05
Author: Sönke Liebau <so...@opencore.com>
AuthorDate: Tue Jun 26 00:46:09 2018 +0200
KAFKA-6591; Move super user check before ACL matching (#4618)
Currently the check whether a user as a super user in `SimpleAclAuthorizer` is performed only after all other ACLs have been evaluated. Since all requests from a super user are granted we don't really need to apply the ACLs. This patch returns true if the user is a super user before checking ACLs, thus bypassing the needless evaluation effort.
Reviewers: Andy Coates <bi...@users.noreply.github.com>, Manikumar Reddy <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 58 ++++++++++++----------
1 file changed, 33 insertions(+), 25 deletions(-)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 504d71a..1dc3a1f 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -112,38 +112,46 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val principal = session.principal
val host = session.clientAddress.getHostAddress
- val acls = getMatchingAcls(resource.resourceType, resource.name)
-
- // Check if there is any Deny acl match that would disallow this operation.
- val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
-
- // Check if there are any Allow ACLs which would allow this operation.
- // Allowing read, write, delete, or alter implies allowing describe.
- // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
- val allowOps = operation match {
- case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
- case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
- case _ => Set[Operation](operation)
+
+ def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = {
+ if (acls.isEmpty) {
+ // No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
+ authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
+ shouldAllowEveryoneIfNoAclIsFound
+ } else false
+ }
+
+ def denyAclExists(acls: Set[Acl]): Boolean = {
+ // Check if there are any Deny ACLs which would forbid this operation.
+ aclMatch(operation, resource, principal, host, Deny, acls)
}
- val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
- //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
- //when no acls are found or if no deny acls are found and at least one allow acls matches.
- val authorized = isSuperUser(operation, resource, principal, host) ||
- isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
- (!denyMatch && allowMatch)
+ def allowAclExists(acls: Set[Acl]): Boolean = {
+ // Check if there are any Allow ACLs which would allow this operation.
+ // Allowing read, write, delete, or alter implies allowing describe.
+ // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
+ val allowOps = operation match {
+ case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
+ case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
+ case _ => Set[Operation](operation)
+ }
+ allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
+ }
+
+ def aclsAllowAccess = {
+ //we allow an operation if no acls are found and user has configured to allow all users
+ //when no acls are found or if no deny acls are found and at least one allow acls matches.
+ val acls = getMatchingAcls(resource.resourceType, resource.name)
+ isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && allowAclExists(acls))
+ }
+
+ // Evaluate if operation is allowed
+ val authorized = isSuperUser(operation, resource, principal, host) || aclsAllowAccess
logAuditMessage(principal, authorized, operation, resource, host)
authorized
}
- def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = {
- if (acls.isEmpty) {
- authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
- shouldAllowEveryoneIfNoAclIsFound
- } else false
- }
-
def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
if (superUsers.contains(principal)) {
authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")