You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/12/02 15:58:35 UTC

[kafka] branch trunk updated: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b00a07ccc4 KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)
7b00a07ccc4 is described below

commit 7b00a07ccc4334d81299e5c562109fa2555ce934
Author: Proven Provenzano <93...@users.noreply.github.com>
AuthorDate: Fri Dec 2 10:58:10 2022 -0500

    KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)
    
    * Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers
    
    * SCRAM and Delegation are not implemented for KRAFT yet so they emit
    a message to stderr and pass the test.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../DelegationTokenEndToEndAuthorizationTest.scala |  11 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      | 181 +++++++++++++++------
 .../kafka/api/IntegrationTestHarness.scala         |   5 +
 .../api/PlaintextEndToEndAuthorizationTest.scala   |  10 +-
 .../kafka/api/SaslEndToEndAuthorizationTest.scala  |  50 +++---
 .../SaslGssapiSslEndToEndAuthorizationTest.scala   |   4 +-
 ...slOAuthBearerSslEndToEndAuthorizationTest.scala |   2 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |   7 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala    |  13 +-
 9 files changed, 195 insertions(+), 88 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 40ad090b46e..920d0ccb254 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 import java.util.Properties
 
 import kafka.server.KafkaConfig
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils._
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, ScramCredentialInfo, UserScramCredentialAlteration, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
 import org.apache.kafka.common.config.SaslConfigs
@@ -46,6 +46,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
 
   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
   protected val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+  override val unimplementedquorum = "kraft"
 
   protected val privilegedAdminClientConfig = new Properties()
 
@@ -110,9 +111,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
-    startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
-    super.setUp(testInfo)
-    privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+    if (!TestInfoUtils.isKRaft(testInfo)) {
+      startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
+      super.setUp(testInfo)
+      privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+    }
   }
 
   def assertTokenOwner(owner: KafkaPrincipal, token: DelegationToken): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index ad4b66e20a3..f4c51a51b88 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException
 
 import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.clients.admin.Admin
@@ -36,12 +37,12 @@ import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthori
 import org.apache.kafka.common.resource._
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{ValueSource, CsvSource}
 
 import scala.jdk.CollectionConverters._
 
@@ -65,6 +66,7 @@ import scala.jdk.CollectionConverters._
   */
 @Timeout(60)
 abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
+
   override val brokerCount = 3
 
   val numRecords = 1
@@ -138,24 +140,35 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
   this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1500")
 
+  val unimplementedquorum = ""
+
   /**
     * Starts MiniKDC and only then sets up the parent trait.
     */
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 
-    // The next two configuration parameters enable ZooKeeper secure ACLs
-    // and sets the Kafka authorizer, both necessary to enable security.
-    this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-    this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
+    if (TestInfoUtils.isKRaft(testInfo)) {
+      this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
+      this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + "User:ANONYMOUS")
+      this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+      this.controllerConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+    } else {
+      // The next two configuration parameters enable ZooKeeper secure ACLs
+      // and sets the Kafka authorizer, both necessary to enable security.
+      this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+      this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
 
-    // Set the specific principal that can update ACLs.
-    this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
+      // Set the specific principal that can update ACLs.
+      this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
+    }
 
     super.setUp(testInfo)
 
     // create the test topic with all the brokers as replicas
-    createTopic(topic, 1, 3)
+    val superuserAdminClient = createSuperuserAdminClient()
+    TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers,
+      numPartitions = 1, replicationFactor = 3, topicConfig = new Properties)
   }
 
   /**
@@ -170,23 +183,28 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   /**
     * Tests the ability of producing and consuming with the appropriate ACLs set.
     */
-  @Test
-  def testProduceConsumeViaAssign(): Unit = {
-    setAclsAndProduce(tp)
-    val consumer = createConsumer()
-    consumer.assign(List(tp).asJava)
-    consumeRecords(consumer, numRecords)
-    confirmReauthenticationMetrics()
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testProduceConsumeViaAssign(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
+      setAclsAndProduce(tp)
+      val consumer = createConsumer()
+      consumer.assign(List(tp).asJava)
+      consumeRecords(consumer, numRecords)
+      confirmReauthenticationMetrics()
+    }
   }
 
   protected def confirmReauthenticationMetrics(): Unit = {
     val expiredConnectionsKilledCountTotal = getGauge("ExpiredConnectionsKilledCount").value()
-    servers.foreach { s =>
+    brokers.foreach { s =>
         val numExpiredKilled = TestUtils.totalMetricValue(s, "expired-connections-killed-count")
         assertEquals(0, numExpiredKilled, "Should have been zero expired connections killed: " + numExpiredKilled + "(total=" + expiredConnectionsKilledCountTotal + ")")
     }
     assertEquals(0, expiredConnectionsKilledCountTotal, 0.0, "Should have been zero expired connections killed total")
-    servers.foreach { s =>
+    brokers.foreach { s =>
       assertEquals(0, TestUtils.totalMetricValue(s, "failed-reauthentication-total"), "failed re-authentications not 0")
     }
   }
@@ -198,17 +216,26 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       ._2.asInstanceOf[Gauge[Double]]
   }
 
-  @Test
-  def testProduceConsumeViaSubscribe(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testProduceConsumeViaSubscribe(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     setAclsAndProduce(tp)
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
     consumeRecords(consumer, numRecords)
     confirmReauthenticationMetrics()
+    }
   }
 
-  @Test
-  def testProduceConsumeWithWildcardAcls(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testProduceConsumeWithWildcardAcls(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     setWildcardResourceAcls()
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -216,10 +243,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumer.subscribe(List(topic).asJava)
     consumeRecords(consumer, numRecords)
     confirmReauthenticationMetrics()
+    }
   }
 
-  @Test
-  def testProduceConsumeWithPrefixedAcls(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testProduceConsumeWithPrefixedAcls(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     setPrefixedResourceAcls()
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -227,10 +259,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumer.subscribe(List(topic).asJava)
     consumeRecords(consumer, numRecords)
     confirmReauthenticationMetrics()
+    }
   }
 
-  @Test
-  def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testProduceConsumeTopicAutoCreateTopicCreateAcl(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     // topic2 is not created on setup()
     val tp2 = new TopicPartition("topic2", 0)
     setAclsAndProduce(tp2)
@@ -238,6 +275,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumer.assign(List(tp2).asJava)
     consumeRecords(consumer, numRecords, topic = tp2.topic)
     confirmReauthenticationMetrics()
+    }
   }
 
   private def setWildcardResourceAcls(): Unit = {
@@ -245,7 +283,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values
     superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
     }
@@ -256,7 +294,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values
     superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
     }
@@ -270,7 +308,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.createAcls(List(AclTopicRead(topicResource)).asJava).values
     superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
         new ResourcePattern(TOPIC, tp.topic, LITERAL))
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
@@ -286,7 +324,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   private def setConsumerGroupAcls(): Unit = {
     val superuserAdminClient = createSuperuserAdminClient()
     superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
   }
@@ -297,8 +335,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * Also verifies that subsequent publish, consume and describe to authorized topic succeeds.
     */
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
+  @CsvSource(value = Array(
+    "kraft, true",
+    "kraft, false",
+    "zk, true",
+    "zk, false"
+  ))
+  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     // Set consumer group acls since we are testing topic authorization
     setConsumerGroupAcls()
 
@@ -361,15 +407,24 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues
     assertEquals(1, describeResults2.get(topic).get().partitions().size())
     assertEquals(1, describeResults2.get(topic2).get().partitions().size())
+    }
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testNoProduceWithDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
+  @CsvSource(value = Array(
+    "kraft, true",
+    "kraft, false",
+    "zk, true",
+    "zk, false"
+  ))
+  def testNoProduceWithDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     val superuserAdminClient = createSuperuserAdminClient()
     superuserAdminClient.createAcls(List(AclTopicDescribe()).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
 
@@ -385,24 +440,34 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
     confirmReauthenticationMetrics()
+    }
   }
 
    /**
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
     */
-  @Test
-  def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testNoConsumeWithoutDescribeAclViaAssign(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     noConsumeWithoutDescribeAclSetup()
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     // the exception is expected when the consumer attempts to lookup offsets
     assertThrows(classOf[KafkaException], () => consumeRecords(consumer))
     confirmReauthenticationMetrics()
+    }
   }
 
-  @Test
-  def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testNoConsumeWithoutDescribeAclViaSubscribe(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     noConsumeWithoutDescribeAclSetup()
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
@@ -417,6 +482,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // Verify that records are consumed if all topics are authorized
     consumer.subscribe(List(topic).asJava)
     consumeRecordsIgnoreOneAuthorizationException(consumer)
+    }
   }
 
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
@@ -424,7 +490,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
     superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
@@ -435,13 +501,17 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.deleteAcls(List(AclTopicDescribe().toFilter).asJava).values
     superuserAdminClient.deleteAcls(List(AclTopicWrite().toFilter).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
   }
 
-  @Test
-  def testNoConsumeWithDescribeAclViaAssign(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testNoConsumeWithDescribeAclViaAssign(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     noConsumeWithDescribeAclSetup()
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
@@ -449,10 +519,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
     assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     confirmReauthenticationMetrics()
+    }
   }
 
-  @Test
-  def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testNoConsumeWithDescribeAclViaSubscribe(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     noConsumeWithDescribeAclSetup()
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
@@ -460,6 +535,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
     assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     confirmReauthenticationMetrics()
+    }
   }
 
   private def noConsumeWithDescribeAclSetup(): Unit = {
@@ -467,7 +543,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
     superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
 
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
@@ -479,11 +555,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
     */
-  @Test
-  def testNoGroupAcl(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testNoGroupAcl(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+        Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
     val superuserAdminClient = createSuperuserAdminClient()
     superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
-    servers.foreach { s =>
+    brokers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
     val producer = createProducer()
@@ -494,6 +574,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     val e = assertThrows(classOf[GroupAuthorizationException], () => consumeRecords(consumer))
     assertEquals(group, e.groupId())
     confirmReauthenticationMetrics()
+    }
   }
 
   protected final def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7ccce46665c..3b459875b60 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -47,6 +47,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
   val adminClientConfig = new Properties
   val superuserClientConfig = new Properties
   val serverConfig = new Properties
+  val controllerConfig = new Properties
 
   private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
   private val producers = mutable.Buffer[KafkaProducer[_, _]]()
@@ -67,6 +68,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     cfgs.map(KafkaConfig.fromProps)
   }
 
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+    Seq(controllerConfig)
+  }
+
   protected def configureListeners(props: Seq[Properties]): Unit = {
     props.foreach { config =>
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 19fea48d6f0..18e792344fc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -21,9 +21,12 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.clients.admin.AdminClientConfig
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.apache.kafka.common.errors.TopicAuthorizationException
+import kafka.utils.TestInfoUtils
 
 // This test case uses a separate listener for client and inter-broker communication, from
 // which we derive corresponding principals
@@ -86,8 +89,9 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
     superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(interBrokerListenerName))
   }
 
-  @Test
-  def testListenerName(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testListenerName(quorum: String): Unit = {
     // To check the client listener name, establish a session on the server by sending any request eg sendRecords
     val producer = createProducer()
     assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords = 1, tp))
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index cc67d01546a..2f599c5ae32 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,8 +19,11 @@ package kafka.api
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import kafka.utils.TestInfoUtils
 
 import scala.collection.immutable.List
 import scala.jdk.CollectionConverters._
@@ -55,30 +58,35 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
     * the second one connects ok, but fails to consume messages due to the ACL.
     */
   @Timeout(15)
-  @Test
-  def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
-    setAclsAndProduce(tp)
-    val consumer1 = createConsumer()
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testTwoConsumersWithDifferentSaslCredentials(quorum: String): Unit = {
+    if (quorum == unimplementedquorum) {
+      Console.err.println("QuorumName : " + quorum + " is not supported.")
+    } else {
+      setAclsAndProduce(tp)
+      val consumer1 = createConsumer()
 
-    // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
-    consumerConfig.remove(SaslConfigs.SASL_JAAS_CONFIG)
-    consumerConfig.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
+      // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
+      consumerConfig.remove(SaslConfigs.SASL_JAAS_CONFIG)
+      consumerConfig.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
 
-    val consumer2 = createConsumer()
-    consumer1.assign(List(tp).asJava)
-    consumer2.assign(List(tp).asJava)
+      val consumer2 = createConsumer()
+      consumer1.assign(List(tp).asJava)
+      consumer2.assign(List(tp).asJava)
 
-    consumeRecords(consumer1, numRecords)
+      consumeRecords(consumer1, numRecords)
 
-    try {
-      consumeRecords(consumer2)
-      fail("Expected exception as consumer2 has no access to topic or group")
-    } catch {
-      // Either exception is possible depending on the order that the first Metadata
-      // and FindCoordinator requests are received
-      case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
-      case e: GroupAuthorizationException => assertEquals(group, e.groupId)
+      try {
+        consumeRecords(consumer2)
+        fail("Expected exception as consumer2 has no access to topic or group")
+      } catch {
+        // Either exception is possible depending on the order that the first Metadata
+        // and FindCoordinator requests are received
+        case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
+        case e: GroupAuthorizationException => assertEquals(group, e.groupId)
+      }
+      confirmReauthenticationMetrics()
     }
-    confirmReauthenticationMetrics()
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index 5311f4f6355..83cfbabb8ba 100644
--- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -20,7 +20,8 @@ import kafka.security.authorizer.AclAuthorizer
 import kafka.server.KafkaConfig
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.config.SslConfigs
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
+
 import org.junit.jupiter.api.Assertions.assertNull
 
 import scala.collection.immutable.List
@@ -39,6 +40,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
   // client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client
   // authentication or SASL authentication with SSL as the transport layer (but not both).
   serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+  controllerConfig.put(KafkaConfig.SslClientAuthProp, "required")
   assertNull(producerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
   assertNull(consumerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
   assertNull(adminClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
diff --git a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
index 4baa61d4c91..904c35f31f5 100644
--- a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
@@ -17,7 +17,7 @@
 package kafka.api
 
 import kafka.utils.JaasTestUtils
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
 
 class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
   override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 4d0936dcdd8..356ae2fe339 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -38,8 +38,9 @@ import org.junit.jupiter.api.Test
 
 object SaslPlainSslEndToEndAuthorizationTest {
 
-  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
+  val controllerPrincipalName = "admin"
 
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       val saslContext = context.asInstanceOf[SaslAuthenticationContext]
 
@@ -50,7 +51,7 @@ object SaslPlainSslEndToEndAuthorizationTest {
 
       saslContext.server.getAuthorizationID match {
         case KafkaPlainAdmin =>
-          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
+          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
         case KafkaPlainUser =>
           new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
         case _ =>
@@ -123,7 +124,7 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   override protected def kafkaServerSaslMechanisms = List("PLAIN")
 
   override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
-  override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
+  override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
 
   override def jaasSections(kafkaServerSaslMechanisms: Seq[String],
                             kafkaClientSaslMechanism: Option[String],
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 6e334d16d08..b44704350bf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.utils.JaasTestUtils
+import kafka.utils._
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
@@ -33,6 +33,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
   private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+  override val unimplementedquorum = "kraft"
 
   override def configureSecurityBeforeServersStart(): Unit = {
     super.configureSecurityBeforeServersStart()
@@ -53,9 +54,11 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    // Create client credentials after starting brokers so that dynamic credential creation is also tested
-    createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
-    createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
+    if (!TestInfoUtils.isKRaft(testInfo)) {
+      super.setUp(testInfo)
+      // Create client credentials after starting brokers so that dynamic credential creation is also tested
+      createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+      createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
+    }
   }
 }