You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/04 00:26:06 UTC

kafka git commit: KAFKA-2949; Make EndToEndAuthorizationTest replicated.

Repository: kafka
Updated Branches:
  refs/heads/trunk e11946b09 -> b905d4891


KAFKA-2949; Make EndToEndAuthorizationTest replicated.

Author: Flavio Junqueira <fp...@apache.org>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #631 from fpj/KAFKA-2949


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b905d489
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b905d489
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b905d489

Branch: refs/heads/trunk
Commit: b905d489188768ba1c55226857db9713b9272918
Parents: e11946b
Author: Flavio Junqueira <fp...@apache.org>
Authored: Sun Jan 3 15:25:59 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jan 3 15:25:59 2016 -0800

----------------------------------------------------------------------
 .../kafka/api/EndToEndAuthorizationTest.scala   | 40 +++++++++++++++-----
 .../kafka/api/IntegrationTestHarness.scala      |  3 +-
 2 files changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 59cff14..f14149f 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -65,12 +65,15 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   override val serverCount = 3
   override val setClusterAcl = Some(() =>
     { AclCommand.main(clusterAclArgs)
-      TestUtils.waitAndVerifyAcls(ClusterActionAcl, servers.head.apis.authorizer.get, clusterResource)
+      servers.foreach( s =>
+        TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
+      )
     } : Unit
   )
   val numRecords = 1
   val group = "group"
   val topic = "e2etopic"
+  val topicWildcard = "*"
   val part = 0
   val tp = new TopicPartition(topic, part)
   val topicAndPartition = new TopicAndPartition(topic, part)
@@ -93,6 +96,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
                                             s"--cluster",
                                             s"--operation=ClusterAction",
                                             s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
+  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
+                                                    s"zookeeper.connect=$zkConnect",
+                                                    s"--add",
+                                                    s"--topic=$topicWildcard",
+                                                    s"--operation=Read",
+                                                    s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
   def produceAclArgs: Array[String] = Array("--authorizer-properties",
                                           s"zookeeper.connect=$zkConnect",
                                           s"--add",
@@ -106,13 +115,14 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
                                                s"--group=$group",
                                                s"--consumer",
                                                s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties", 
+  def groupAclArgs: Array[String] = Array("--authorizer-properties",
                                           s"zookeeper.connect=$zkConnect",
                                           s"--add",
                                           s"--group=$group",
                                           s"--operation=Read",
                                           s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
-  def ClusterActionAcl:Set[Acl] =  Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
+  def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
+  def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
   def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
   def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
   def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write))
@@ -124,7 +134,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   // Some needed configuration for brokers, producers, and consumers
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
 
   /**
@@ -139,8 +149,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
         startSasl(Both)
     }
     super.setUp
+    AclCommand.main(topicBrokerReadAclArgs)
+    servers.foreach( s =>
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
+    )
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers)
   }
 
   /**
@@ -159,8 +173,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   def testProduceConsume {
     AclCommand.main(produceAclArgs)
     AclCommand.main(consumeAclArgs)
-    TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
-    TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+    servers.foreach(s => {
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+    })
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)
@@ -195,8 +211,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   def testNoConsumeAcl {
     AclCommand.main(produceAclArgs)
     AclCommand.main(groupAclArgs)
-    TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
-    TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+    servers.foreach(s => {
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+    })
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)
@@ -218,7 +236,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   @Test
   def testNoGroupAcl {
     AclCommand.main(produceAclArgs)
-    TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
+    servers.foreach(s =>
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+    )
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7aaa185..b4f31c4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -63,8 +63,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.putAll(consumerSecurityProps)
     for (i <- 0 until producerCount)
-      producers += TestUtils.createNewProducer(brokerList, 
-                                               acks = 1,
+      producers += TestUtils.createNewProducer(brokerList,
                                                securityProtocol = this.securityProtocol,
                                                trustStoreFile = this.trustStoreFile,
                                                props = Some(producerConfig))