You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/26 06:49:45 UTC

[kafka] branch trunk updated: KAFKA-2951; Add a test to verify produce, consume with ACLs for topic/group wildcard resources (#5054)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d45d7ec  KAFKA-2951; Add a test to verify produce, consume with ACLs for topic/group wildcard resources (#5054)
d45d7ec is described below

commit d45d7ec781c7ab6d481d8e7495a08459de29d264
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Sat May 26 12:19:31 2018 +0530

    KAFKA-2951; Add a test to verify produce, consume with ACLs for topic/group wildcard resources (#5054)
---
 .../kafka/api/EndToEndAuthorizationTest.scala      | 33 ++++++++++++++++++++--
 1 file changed, 30 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4500d49..a2e5fd9 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -67,7 +67,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   val numRecords = 1
   val group = "group"
   val topic = "e2etopic"
-  val topicWildcard = "*"
+  val wildcard = "*"
   val part = 0
   val tp = new TopicPartition(topic, part)
   val topicAndPartition = TopicAndPartition(topic, part)
@@ -79,6 +79,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
   val clusterResource = Resource.ClusterResource
+  val wildcardTopicResource = new Resource(Topic, wildcard)
+  val wildcardGroupResource = new Resource(Group, wildcard)
 
   // Arguments to AclCommand to set ACLs. There are three definitions here:
   // 1- Provides read and write access to topic
@@ -93,7 +95,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
                                                     s"zookeeper.connect=$zkConnect",
                                                     s"--add",
-                                                    s"--topic=$topicWildcard",
+                                                    s"--topic=$wildcard",
                                                     s"--operation=Read",
                                                     s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
   def produceAclArgs: Array[String] = Array("--authorizer-properties",
@@ -135,6 +137,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
                                           s"--group=$group",
                                           s"--operation=Read",
                                           s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties",
+                                            s"zookeeper.connect=$zkConnect",
+                                            s"--add",
+                                            s"--topic=$wildcard",
+                                            s"--group=$wildcard",
+                                            s"--consumer",
+                                            s"--producer",
+                                            s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+
   def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
   def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
   def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
@@ -173,7 +184,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
                                 saslProperties = this.clientSaslProperties,
                                 props = Some(producerConfig))
   }
-  
+
   /**
     * Closes MiniKDC last when tearing down.
     */
@@ -201,6 +212,22 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(this.consumers.head, numRecords)
   }
 
+  @Test
+  def testProduceConsumeWithWildcardAcls(): Unit = {
+    setWildcardResourceAcls()
+    sendRecords(numRecords, tp)
+    consumers.head.subscribe(List(topic).asJava)
+    consumeRecords(this.consumers.head, numRecords)
+  }
+
+  private def setWildcardResourceAcls() {
+    AclCommand.main(produceConsumeWildcardAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+    }
+  }
+
   protected def setAclsAndProduce() {
     AclCommand.main(produceAclArgs)
     AclCommand.main(consumeAclArgs)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.