You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/03 01:28:51 UTC
[kafka] branch trunk updated: KAFKA-6912;
Add test for authorization with custom principal types
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6d7377 KAFKA-6912; Add test for authorization with custom principal types
f6d7377 is described below
commit f6d7377f95b80686e51f8b60de57f2769685945a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sat Jun 2 18:28:16 2018 -0700
KAFKA-6912; Add test for authorization with custom principal types
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Dong Lin <li...@gmail.com>
Closes #5030 from rajinisivaram/MINOR-group-test
---
.../kafka/api/AuthorizerIntegrationTest.scala | 251 +++++++++++----------
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 41 ++++
.../kafka/api/GroupEndToEndAuthorizationTest.scala | 46 ++++
.../SaslScramSslEndToEndAuthorizationTest.scala | 3 +-
4 files changed, 214 insertions(+), 127 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 681497e..4af9b83 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -53,6 +53,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
override def numBrokers: Int = 1
val brokerId: Integer = 0
+ def userPrincipal = KafkaPrincipal.ANONYMOUS
val topic = "topic"
val topicPattern = "topic.*"
@@ -73,23 +74,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val deleteTopicResource = new Resource(Topic, deleteTopic)
val transactionalIdResource = new Resource(TransactionalId, transactionalId)
- val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
- val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
- val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
- val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
- val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
- val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
- val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
- val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
- val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
- val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
- val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
- val topicAlterAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
- val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
- val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
- val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
- val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
- val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+ val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
+ val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+ val groupDeleteAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)))
+ val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)))
+ val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
+ val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
+ val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+ val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
+ val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
+ val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
+ val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+ val topicAlterAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
+ val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)))
+ val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, DescribeConfigs)))
+ val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs)))
+ val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
+ val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -231,7 +232,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
override def setUp() {
super.setUp()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
for (_ <- 0 until producerCount)
producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
@@ -375,12 +376,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createAclsRequest = new CreateAclsRequest.Builder(
Collections.singletonList(new AclCreation(new AclBinding(
new AdminResource(AdminResourceType.TOPIC, "mytopic"),
- new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
+ new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
private def deleteAclsRequest = new DeleteAclsRequest.Builder(
Collections.singletonList(new AclBindingFilter(
new ResourceFilter(AdminResourceType.TOPIC, null),
- new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
+ new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
@@ -521,7 +522,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testProduceWithTopicDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
try {
sendRecords(numRecords, tp)
fail("should have thrown exception")
@@ -533,7 +534,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testProduceWithTopicRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
try {
sendRecords(numRecords, tp)
fail("should have thrown exception")
@@ -545,7 +546,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testProduceWithTopicWrite() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(numRecords, tp)
}
@@ -553,7 +554,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def testCreatePermissionNeededForWritingToNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
try {
sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception")
@@ -561,13 +562,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
sendRecords(numRecords, topicPartition)
}
@Test(expected = classOf[TopicAuthorizationException])
def testConsumeUsingAssignWithNoAccess(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
this.consumers.head.assign(List(tp).asJava)
@@ -576,11 +577,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
try {
// note this still depends on group access because we haven't set offsets explicitly, which means
// they will first be fetched from the consumer coordinator (which requires group access)
@@ -594,11 +595,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
// in this case, we do an explicit seek, so there should be no need to query the coordinator at all
this.consumers.head.assign(List(tp).asJava)
@@ -608,11 +609,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[KafkaException])
def testConsumeWithoutTopicDescribeAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
// the consumer should raise an exception if it receives UNKNOWN_TOPIC_OR_PARTITION
@@ -622,12 +623,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testConsumeWithTopicDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
@@ -639,12 +640,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testConsumeWithTopicWrite() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
@@ -657,23 +658,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testConsumeWithTopicAndGroupRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
}
@Test
def testPatternSubscriptionWithNoTopicAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
this.consumers.head.poll(50)
assertTrue(this.consumers.head.subscription.isEmpty)
@@ -681,12 +682,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
val consumer = consumers.head
consumer.subscribe(Pattern.compile(topicPattern))
try {
@@ -699,25 +700,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testPatternSubscriptionWithTopicAndGroupRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
// create an unmatched topic
val unmatchedTopic = "unmatched"
createTopic(unmatchedTopic)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
sendRecords(1, new TopicPartition(unmatchedTopic, part))
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
val consumer = consumers.head
consumer.subscribe(Pattern.compile(topicPattern))
consumeRecords(consumer)
// set the subscription pattern to an internal topic that the consumer has read permission to. Since
// internal topics are not included, we should not be assigned any partitions from this topic
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic,
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), new Resource(Topic,
GROUP_METADATA_TOPIC_NAME))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
@@ -727,12 +728,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testPatternSubscriptionMatchingInternalTopic() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
val consumerConfig = new Properties
consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -745,7 +746,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic).asJava, consumer.subscription)
// now authorize the user for the internal topic and verify that we can subscribe
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic,
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
GROUP_METADATA_TOPIC_NAME))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
@@ -755,14 +756,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
val consumerConfig = new Properties
consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -781,12 +782,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testPatternSubscriptionNotMatchingInternalTopic() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
val consumerConfig = new Properties
consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -803,7 +804,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
val newTopicResource = new Resource(Topic, newTopic)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource)
try {
@@ -815,8 +816,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
sendRecords(numRecords, topicPartition)
consumeRecords(this.consumers.head, topic = newTopic, part = 0)
@@ -829,34 +830,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[KafkaException])
def testCommitWithNoTopicAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[TopicAuthorizationException])
def testCommitWithTopicWrite() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[TopicAuthorizationException])
def testCommitWithTopicDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[GroupAuthorizationException])
def testCommitWithNoGroupAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test
def testCommitWithTopicAndGroupRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@@ -868,14 +869,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[GroupAuthorizationException])
def testOffsetFetchWithNoGroupAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test(expected = classOf[KafkaException])
def testOffsetFetchWithNoTopicAccess() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@@ -883,13 +884,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testFetchAllOffsetsTopicAuthorization() {
val offset = 15L
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava)
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
// send offset fetch requests directly since the consumer does not expose an API to do so
// note there's only one broker, so no need to lookup the group coordinator
@@ -901,7 +902,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertTrue(offsetFetchResponse.responseData.isEmpty)
// now add describe permission on the topic and verify that the offset can be fetched
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer)
assertEquals(Errors.NONE, offsetFetchResponse.error)
assertTrue(offsetFetchResponse.responseData.containsKey(tp))
@@ -910,16 +911,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testOffsetFetchTopicDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test
def testOffsetFetchWithTopicAndGroupRead() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@@ -931,27 +932,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testListOffsetsWithTopicDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.partitionsFor(topic)
}
@Test(expected = classOf[GroupAuthorizationException])
def testDescribeGroupApiWithNoGroupAcl() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
}
@Test
def testDescribeGroupApiWithGroupDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
}
@Test
def testDescribeGroupCliWithGroupDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
@@ -962,9 +963,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteGroupApiWithDeleteGroupAcl() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -973,8 +974,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -997,7 +998,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testUnauthorizedDeleteTopicsWithDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1007,7 +1008,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteTopicsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1025,7 +1026,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testUnauthorizedDeleteRecordsWithDescribe() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
val version = ApiKeys.DELETE_RECORDS.latestVersion
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1034,7 +1035,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteRecordsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
val version = ApiKeys.DELETE_RECORDS.latestVersion
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1052,7 +1053,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testCreatePartitionsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
val version = ApiKeys.CREATE_PARTITIONS.latestVersion
val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1061,7 +1062,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[TransactionalIdAuthorizationException])
def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
}
@@ -1074,9 +1075,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1090,8 +1091,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1105,7 +1106,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
val producer = buildIdempotentProducer()
try {
// the InitProducerId is sent asynchronously, so we expect the error either in the callback
@@ -1129,8 +1130,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
val producer = buildIdempotentProducer()
@@ -1139,7 +1140,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// revoke the IdempotentWrite permission
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
try {
// the send should now fail with a cluster auth error
@@ -1162,16 +1163,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldInitTransactionsWhenAclSet(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
}
@Test
def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
// add describe access so that we can fetch metadata
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1191,9 +1192,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
// add describe access so that we can fetch metadata
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1209,11 +1210,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
removeAllAcls()
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
try {
producer.beginTransaction()
producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
@@ -1226,8 +1227,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1243,9 +1244,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1263,8 +1264,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), groupResource)
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
@@ -1280,8 +1281,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = {
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
- addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
val producer = buildIdempotentProducer()
producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
}
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
new file mode 100644
index 0000000..27c3f31
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.util.Properties
+
+import kafka.api.GroupAuthorizerIntegrationTest._
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
+
+
+object GroupAuthorizerIntegrationTest {
+ val GroupPrincipalType = "Group"
+ val TestGroupPrincipal = new KafkaPrincipal(GroupPrincipalType, "testGroup")
+ class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+ override def build(context: AuthenticationContext): KafkaPrincipal = {
+ TestGroupPrincipal
+ }
+ }
+}
+
+class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
+ override val kafkaPrincipalType = GroupPrincipalType
+ override def userPrincipal = TestGroupPrincipal
+
+ override def propertyOverrides(properties: Properties): Unit = {
+ properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+ classOf[GroupPrincipalBuilder].getName)
+ super.propertyOverrides(properties)
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..8dea0fc
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.api.GroupEndToEndAuthorizationTest._
+import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
+
+object GroupEndToEndAuthorizationTest {
+ val GroupPrincipalType = "Group"
+ val ClientGroup = "testGroup"
+ class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+ override def build(context: AuthenticationContext): KafkaPrincipal = {
+ context match {
+ case ctx: SaslAuthenticationContext =>
+ if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser)
+ new KafkaPrincipal(GroupPrincipalType, ClientGroup)
+ else
+ new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID)
+ case _ =>
+ KafkaPrincipal.ANONYMOUS
+ }
+ }
+ }
+}
+
+class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest {
+ override val kafkaPrincipalType = GroupPrincipalType
+ override val clientPrincipal = ClientGroup
+ this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index d304ffc..dd1e705 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -28,7 +28,6 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
override val clientPrincipal = JaasTestUtils.KafkaScramUser
override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
- private val clientPassword = JaasTestUtils.KafkaScramPassword
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
override def configureSecurityBeforeServersStart() {
@@ -42,7 +41,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override def setUp() {
super.setUp()
// Create client credentials after starting brokers so that dynamic credential creation is also tested
- createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+ createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
}
}
--
To stop receiving notification emails like this one, please contact
lindong@apache.org.