You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/29 13:24:33 UTC
[kafka] branch 2.0 updated: KAFKA-7028: Properly authorize custom
principal objects (#5311)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 6ab9d16 KAFKA-7028: Properly authorize custom principal objects (#5311)
6ab9d16 is described below
commit 6ab9d161e665b681b2982c80e265a4edcfff594e
Author: Stanislav Kozlovski <fa...@windowslive.com>
AuthorDate: Fri Jun 29 14:13:45 2018 +0100
KAFKA-7028: Properly authorize custom principal objects (#5311)
Use KafkaPrincipal objects for authorization in `SimpleAclAuthorizer` so that comparison with super.users and ACLs instantiated from Strings work. Previously, it would compare two different classes `KafkaPrincipal` and the custom class, which would always return false because of the implementation of `KafkaPrincipal#equals`.
---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 8 ++++-
.../security/auth/SimpleAclAuthorizerTest.scala | 40 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 504d71a..ceb9015 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -110,7 +110,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.patternType)
}
- val principal = session.principal
+ // ensure we compare identical classes
+ val sessionPrincipal = session.principal
+ val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
+ new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
+ else
+ sessionPrincipal
+
val host = session.clientAddress.getHostAddress
val acls = getMatchingAcls(resource.resourceType, resource.name)
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 7ab3c0a..3d1ceb6 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -54,6 +54,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
private var config: KafkaConfig = _
private var zooKeeperClient: ZooKeeperClient = _
+ class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
+ override def equals(o: scala.Any): Boolean = false
+ }
+
@Before
override def setUp() {
super.setUp()
@@ -139,6 +143,29 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session, Write, resource))
}
+ /**
+ CustomPrincipals should be compared with their principal type and name
+ */
+ @Test
+ def testAllowAccessWithCustomPrincipal() {
+ val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val host2 = InetAddress.getByName("192.168.1.2")
+
+ // user has READ access from host2 but not from host1
+ val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
+ val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
+ val acls = Set[Acl](acl1, acl2)
+ changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+ val host1Session = Session(customUserPrincipal, host1)
+ val host2Session = Session(customUserPrincipal, host2)
+
+ assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session, Read, resource))
+ assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session, Read, resource))
+ }
+
@Test
def testDenyTakesPrecedence() {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -177,6 +204,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource))
}
+ /**
+ CustomPrincipals should be compared with their principal type and name
+ */
+ @Test
+ def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+ val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+ changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+ val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
+
+ assertTrue("superuser with custom principal always has access, no matter what acls.", simpleAclAuthorizer.authorize(session, Read, resource))
+ }
+
@Test
def testWildCardAcls(): Unit = {
assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource))