You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/04 00:26:06 UTC
kafka git commit: KAFKA-2949;
Make EndToEndAuthorizationTest replicated.
Repository: kafka
Updated Branches:
refs/heads/trunk e11946b09 -> b905d4891
KAFKA-2949; Make EndToEndAuthorizationTest replicated.
Author: Flavio Junqueira <fp...@apache.org>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #631 from fpj/KAFKA-2949
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b905d489
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b905d489
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b905d489
Branch: refs/heads/trunk
Commit: b905d489188768ba1c55226857db9713b9272918
Parents: e11946b
Author: Flavio Junqueira <fp...@apache.org>
Authored: Sun Jan 3 15:25:59 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jan 3 15:25:59 2016 -0800
----------------------------------------------------------------------
.../kafka/api/EndToEndAuthorizationTest.scala | 40 +++++++++++++++-----
.../kafka/api/IntegrationTestHarness.scala | 3 +-
2 files changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 59cff14..f14149f 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -65,12 +65,15 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 3
override val setClusterAcl = Some(() =>
{ AclCommand.main(clusterAclArgs)
- TestUtils.waitAndVerifyAcls(ClusterActionAcl, servers.head.apis.authorizer.get, clusterResource)
+ servers.foreach( s =>
+ TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
+ )
} : Unit
)
val numRecords = 1
val group = "group"
val topic = "e2etopic"
+ val topicWildcard = "*"
val part = 0
val tp = new TopicPartition(topic, part)
val topicAndPartition = new TopicAndPartition(topic, part)
@@ -93,6 +96,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
s"--cluster",
s"--operation=ClusterAction",
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
+ def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
+ s"zookeeper.connect=$zkConnect",
+ s"--add",
+ s"--topic=$topicWildcard",
+ s"--operation=Read",
+ s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
def produceAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
@@ -106,13 +115,14 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
s"--group=$group",
s"--consumer",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
- def groupAclArgs: Array[String] = Array("--authorizer-properties",
+ def groupAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--group=$group",
s"--operation=Read",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
- def ClusterActionAcl:Set[Acl] = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
+ def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
+ def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write))
@@ -124,7 +134,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
// Some needed configuration for brokers, producers, and consumers
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
- this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+ this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
/**
@@ -139,8 +149,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
startSasl(Both)
}
super.setUp
+ AclCommand.main(topicBrokerReadAclArgs)
+ servers.foreach( s =>
+ TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
+ )
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers)
}
/**
@@ -159,8 +173,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
def testProduceConsume {
AclCommand.main(produceAclArgs)
AclCommand.main(consumeAclArgs)
- TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+ servers.foreach(s => {
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+ })
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
@@ -195,8 +211,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
def testNoConsumeAcl {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
- TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+ servers.foreach(s => {
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+ })
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
@@ -218,7 +236,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
@Test
def testNoGroupAcl {
AclCommand.main(produceAclArgs)
- TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
+ servers.foreach(s =>
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ )
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7aaa185..b4f31c4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -63,8 +63,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
for (i <- 0 until producerCount)
- producers += TestUtils.createNewProducer(brokerList,
- acks = 1,
+ producers += TestUtils.createNewProducer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
props = Some(producerConfig))