You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/03 01:28:51 UTC

[kafka] branch trunk updated: KAFKA-6912; Add test for authorization with custom principal types

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6d7377  KAFKA-6912; Add test for authorization with custom principal types
f6d7377 is described below

commit f6d7377f95b80686e51f8b60de57f2769685945a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sat Jun 2 18:28:16 2018 -0700

    KAFKA-6912; Add test for authorization with custom principal types
    
    Author: Rajini Sivaram <ra...@googlemail.com>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Dong Lin <li...@gmail.com>
    
    Closes #5030 from rajinisivaram/MINOR-group-test
---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 251 +++++++++++----------
 .../kafka/api/GroupAuthorizerIntegrationTest.scala |  41 ++++
 .../kafka/api/GroupEndToEndAuthorizationTest.scala |  46 ++++
 .../SaslScramSslEndToEndAuthorizationTest.scala    |   3 +-
 4 files changed, 214 insertions(+), 127 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 681497e..4af9b83 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -53,6 +53,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   override def numBrokers: Int = 1
   val brokerId: Integer = 0
+  def userPrincipal = KafkaPrincipal.ANONYMOUS
 
   val topic = "topic"
   val topicPattern = "topic.*"
@@ -73,23 +74,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteTopicResource = new Resource(Topic, deleteTopic)
   val transactionalIdResource = new Resource(TransactionalId, transactionalId)
 
-  val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
-  val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
-  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
-  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
-  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
-  val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
-  val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val topicAlterAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
-  val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
-  val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
-  val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
-  val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
-  val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
+  val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+  val groupDeleteAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)))
+  val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)))
+  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
+  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
+  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+  val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
+  val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
+  val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
+  val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+  val topicAlterAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
+  val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)))
+  val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, DescribeConfigs)))
+  val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs)))
+  val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
+  val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
 
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -231,7 +232,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   override def setUp() {
     super.setUp()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
 
     for (_ <- 0 until producerCount)
       producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
@@ -375,12 +376,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createAclsRequest = new CreateAclsRequest.Builder(
     Collections.singletonList(new AclCreation(new AclBinding(
       new AdminResource(AdminResourceType.TOPIC, "mytopic"),
-      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
+      new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
 
   private def deleteAclsRequest = new DeleteAclsRequest.Builder(
     Collections.singletonList(new AclBindingFilter(
       new ResourceFilter(AdminResourceType.TOPIC, null),
-      new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
+      new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
   private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
 
@@ -521,7 +522,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     try {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
@@ -533,7 +534,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
@@ -545,7 +546,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(numRecords, tp)
   }
 
@@ -553,7 +554,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testCreatePermissionNeededForWritingToNonExistentTopic() {
     val newTopic = "newTopic"
     val topicPartition = new TopicPartition(newTopic, 0)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
     try {
       sendRecords(numRecords, topicPartition)
       Assert.fail("should have thrown exception")
@@ -561,13 +562,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
     }
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
     sendRecords(numRecords, topicPartition)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testConsumeUsingAssignWithNoAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
     this.consumers.head.assign(List(tp).asJava)
@@ -576,11 +577,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
       // note this still depends on group access because we haven't set offsets explicitly, which means
       // they will first be fetched from the consumer coordinator (which requires group access)
@@ -594,11 +595,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
 
     // in this case, we do an explicit seek, so there should be no need to query the coordinator at all
     this.consumers.head.assign(List(tp).asJava)
@@ -608,11 +609,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[KafkaException])
   def testConsumeWithoutTopicDescribeAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
 
     // the consumer should raise an exception if it receives UNKNOWN_TOPIC_OR_PARTITION
@@ -622,12 +623,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     try {
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
@@ -639,12 +640,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     try {
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
@@ -657,23 +658,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     consumeRecords(this.consumers.head)
   }
 
   @Test
   def testPatternSubscriptionWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
     this.consumers.head.poll(50)
     assertTrue(this.consumers.head.subscription.isEmpty)
@@ -681,12 +682,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     val consumer = consumers.head
     consumer.subscribe(Pattern.compile(topicPattern))
     try {
@@ -699,25 +700,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
 
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
     createTopic(unmatchedTopic)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
     sendRecords(1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     val consumer = consumers.head
     consumer.subscribe(Pattern.compile(topicPattern))
     consumeRecords(consumer)
 
     // set the subscription pattern to an internal topic that the consumer has read permission to. Since
     // internal topics are not included, we should not be assigned any partitions from this topic
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
       GROUP_METADATA_TOPIC_NAME))
     consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     consumer.poll(0)
@@ -727,12 +728,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionMatchingInternalTopic() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -745,7 +746,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       assertEquals(Set(topic).asJava, consumer.subscription)
 
       // now authorize the user for the internal topic and verify that we can subscribe
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic,
+      addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
         GROUP_METADATA_TOPIC_NAME))
       consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
       consumer.poll(0)
@@ -755,14 +756,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -781,12 +782,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionNotMatchingInternalTopic() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -803,7 +804,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val newTopic = "newTopic"
     val topicPartition = new TopicPartition(newTopic, 0)
     val newTopicResource = new Resource(Topic, newTopic)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
     addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource)
     try {
@@ -815,8 +816,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
     }
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
 
     sendRecords(numRecords, topicPartition)
     consumeRecords(this.consumers.head, topic = newTopic, part = 0)
@@ -829,34 +830,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[KafkaException])
   def testCommitWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testCommitWithNoGroupAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test
   def testCommitWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
@@ -868,14 +869,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testOffsetFetchWithNoGroupAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
 
   @Test(expected = classOf[KafkaException])
   def testOffsetFetchWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
@@ -883,13 +884,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testFetchAllOffsetsTopicAuthorization() {
     val offset = 15L
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava)
 
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
 
     // send offset fetch requests directly since the consumer does not expose an API to do so
     // note there's only one broker, so no need to lookup the group coordinator
@@ -901,7 +902,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertTrue(offsetFetchResponse.responseData.isEmpty)
 
     // now add describe permission on the topic and verify that the offset can be fetched
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer)
     assertEquals(Errors.NONE, offsetFetchResponse.error)
     assertTrue(offsetFetchResponse.responseData.containsKey(tp))
@@ -910,16 +911,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testOffsetFetchTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
 
   @Test
   def testOffsetFetchWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
@@ -931,27 +932,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testListOffsetsWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.partitionsFor(topic)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testDescribeGroupApiWithNoGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
   }
 
   @Test
   def testDescribeGroupApiWithGroupDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
   }
 
   @Test
   def testDescribeGroupCliWithGroupDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
@@ -962,9 +963,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteGroupApiWithDeleteGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -973,8 +974,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteGroupApiWithNoDeleteGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -997,7 +998,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testUnauthorizedDeleteTopicsWithDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1007,7 +1008,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteTopicsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1025,7 +1026,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testUnauthorizedDeleteRecordsWithDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1034,7 +1035,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteRecordsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1052,7 +1053,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testCreatePartitionsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
     val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
     val version = ApiKeys.CREATE_PARTITIONS.latestVersion
     val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1061,7 +1062,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[TransactionalIdAuthorizationException])
   def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
   }
@@ -1074,9 +1075,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1090,8 +1091,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1105,7 +1106,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     val producer = buildIdempotentProducer()
     try {
       // the InitProducerId is sent asynchronously, so we expect the error either in the callback
@@ -1129,8 +1130,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
 
     val producer = buildIdempotentProducer()
 
@@ -1139,7 +1140,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     // revoke the IdempotentWrite permission
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
 
     try {
       // the send should now fail with a cluster auth error
@@ -1162,16 +1163,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldInitTransactionsWhenAclSet(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
   }
 
   @Test
   def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     // add describe access so that we can fetch metadata
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1191,9 +1192,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     // add describe access so that we can fetch metadata
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1209,11 +1210,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     try {
       producer.beginTransaction()
       producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
@@ -1226,8 +1227,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1243,9 +1244,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1263,8 +1264,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), groupResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1280,8 +1281,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     val producer = buildIdempotentProducer()
     producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
   }
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
new file mode 100644
index 0000000..27c3f31
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.util.Properties
+
+import kafka.api.GroupAuthorizerIntegrationTest._
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
+
+
+object GroupAuthorizerIntegrationTest {
+  val GroupPrincipalType = "Group"
+  val TestGroupPrincipal = new KafkaPrincipal(GroupPrincipalType, "testGroup")
+  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      TestGroupPrincipal
+    }
+  }
+}
+
+class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
+  override val kafkaPrincipalType = GroupPrincipalType
+  override def userPrincipal = TestGroupPrincipal
+
+  override def propertyOverrides(properties: Properties): Unit = {
+    properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+      classOf[GroupPrincipalBuilder].getName)
+    super.propertyOverrides(properties)
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..8dea0fc
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.api.GroupEndToEndAuthorizationTest._
+import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
+
+object GroupEndToEndAuthorizationTest {
+  val GroupPrincipalType = "Group"
+  val ClientGroup = "testGroup"
+  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: SaslAuthenticationContext =>
+          if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser)
+            new KafkaPrincipal(GroupPrincipalType, ClientGroup)
+          else
+            new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID)
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
+}
+
+class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest {
+  override val kafkaPrincipalType = GroupPrincipalType
+  override val clientPrincipal = ClientGroup
+  this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index d304ffc..dd1e705 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -28,7 +28,6 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
   override val clientPrincipal = JaasTestUtils.KafkaScramUser
   override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
-  private val clientPassword = JaasTestUtils.KafkaScramPassword
   private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
 
   override def configureSecurityBeforeServersStart() {
@@ -42,7 +41,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   override def setUp() {
     super.setUp()
     // Create client credentials after starting brokers so that dynamic credential creation is also tested
-    createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
   }
 }

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.