You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/29 13:24:33 UTC

[kafka] branch 2.0 updated: KAFKA-7028: Properly authorize custom principal objects (#5311)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 6ab9d16  KAFKA-7028: Properly authorize custom principal objects (#5311)
6ab9d16 is described below

commit 6ab9d161e665b681b2982c80e265a4edcfff594e
Author: Stanislav Kozlovski <fa...@windowslive.com>
AuthorDate: Fri Jun 29 14:13:45 2018 +0100

    KAFKA-7028: Properly authorize custom principal objects (#5311)
    
    Use KafkaPrincipal objects for authorization in `SimpleAclAuthorizer` so that comparison with super.users and ACLs instantiated from Strings work. Previously, it would compare two different classes `KafkaPrincipal` and the custom class, which would always return false because of the implementation of `KafkaPrincipal#equals`.
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |  8 ++++-
 .../security/auth/SimpleAclAuthorizerTest.scala    | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 504d71a..ceb9015 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -110,7 +110,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.patternType)
     }
 
-    val principal = session.principal
+    // ensure we compare identical classes
+    val sessionPrincipal = session.principal
+    val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
+      new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
+    else
+      sessionPrincipal
+
     val host = session.clientAddress.getHostAddress
     val acls = getMatchingAcls(resource.resourceType, resource.name)
 
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 7ab3c0a..3d1ceb6 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -54,6 +54,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   private var config: KafkaConfig = _
   private var zooKeeperClient: ZooKeeperClient = _
 
+  class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
+    override def equals(o: scala.Any): Boolean = false
+  }
+
   @Before
   override def setUp() {
     super.setUp()
@@ -139,6 +143,29 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session, Write, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testAllowAccessWithCustomPrincipal() {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
+
+    // user has READ access from host2 but not from host1
+    val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
+    val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
+    val acls = Set[Acl](acl1, acl2)
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    val host1Session = Session(customUserPrincipal, host1)
+    val host2Session = Session(customUserPrincipal, host2)
+
+    assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session, Read, resource))
+    assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session, Read, resource))
+  }
+
   @Test
   def testDenyTakesPrecedence() {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -177,6 +204,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+    val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
+
+    assertTrue("superuser with custom principal always has access, no matter what acls.", simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
   @Test
   def testWildCardAcls(): Unit = {
     assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource))