You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/25 22:46:14 UTC

[kafka] branch trunk updated: KAFKA-6591; Move super user check before ACL matching (#4618)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3cdc78e  KAFKA-6591; Move super user check before ACL matching  (#4618)
3cdc78e is described below

commit 3cdc78e6bb1f83973a14ce1550fe3874f7348b05
Author: Sönke Liebau <so...@opencore.com>
AuthorDate: Tue Jun 26 00:46:09 2018 +0200

    KAFKA-6591; Move super user check before ACL matching  (#4618)
    
    Currently the check whether a user as a super user in `SimpleAclAuthorizer` is performed only after all other ACLs have been evaluated. Since all requests from a super user are granted we don't really need to apply the ACLs. This patch returns true if the user is a super user before checking ACLs, thus bypassing the needless evaluation effort.
    
    Reviewers: Andy Coates <bi...@users.noreply.github.com>, Manikumar Reddy <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 58 ++++++++++++----------
 1 file changed, 33 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 504d71a..1dc3a1f 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -112,38 +112,46 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
     val principal = session.principal
     val host = session.clientAddress.getHostAddress
-    val acls = getMatchingAcls(resource.resourceType, resource.name)
-
-    // Check if there is any Deny acl match that would disallow this operation.
-    val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
-
-    // Check if there are any Allow ACLs which would allow this operation.
-    // Allowing read, write, delete, or alter implies allowing describe.
-    // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
-    val allowOps = operation match {
-      case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
-      case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
-      case _ => Set[Operation](operation)
+
+    def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = {
+      if (acls.isEmpty) {
+        // No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
+        authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
+        shouldAllowEveryoneIfNoAclIsFound
+      } else false
+    }
+
+    def denyAclExists(acls: Set[Acl]): Boolean = {
+      // Check if there are any Deny ACLs which would forbid this operation.
+      aclMatch(operation, resource, principal, host, Deny, acls)
     }
-    val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
 
-    //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
-    //when no acls are found or if no deny acls are found and at least one allow acls matches.
-    val authorized = isSuperUser(operation, resource, principal, host) ||
-      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
-      (!denyMatch && allowMatch)
+    def allowAclExists(acls: Set[Acl]): Boolean = {
+      // Check if there are any Allow ACLs which would allow this operation.
+      // Allowing read, write, delete, or alter implies allowing describe.
+      // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
+      val allowOps = operation match {
+        case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
+        case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
+        case _ => Set[Operation](operation)
+      }
+      allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
+    }
+
+    def aclsAllowAccess = {
+      //we allow an operation if no acls are found and user has configured to allow all users
+      //when no acls are found or if no deny acls are found and at least one allow acls matches.
+      val acls = getMatchingAcls(resource.resourceType, resource.name)
+      isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && allowAclExists(acls))
+    }
+
+    // Evaluate if operation is allowed
+    val authorized = isSuperUser(operation, resource, principal, host) || aclsAllowAccess
 
     logAuditMessage(principal, authorized, operation, resource, host)
     authorized
   }
 
-  def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = {
-    if (acls.isEmpty) {
-      authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
-      shouldAllowEveryoneIfNoAclIsFound
-    } else false
-  }
-
   def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
     if (superUsers.contains(principal)) {
       authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")