You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2023/02/21 13:51:23 UTC

[kafka] branch trunk updated: KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c39123d83d9 KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
c39123d83d9 is described below

commit c39123d83d996edfdc23cdd50be8e51853b6cf1d
Author: Purshotam Chauhan <pc...@confluent.io>
AuthorDate: Tue Feb 21 19:21:15 2023 +0530

    KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
    
    Added the following checks -
    * In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed.
    * In AclControlManager.addAcl() to fail if Resource Name is null or empty.
    
    Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes -
    * Rename AclAuthorizerTest -> AuthorizerTest
    * Parameterize relevant tests to run for both modes
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 ...clAuthorizerTest.scala => AuthorizerTest.scala} | 584 ++++++++++++---------
 .../apache/kafka/controller/AclControlManager.java |   3 +
 .../authorizer/StandardAuthorizerData.java         |   3 +
 .../kafka/controller/MockAclControlManager.java    |  50 ++
 .../kafka/metadata/authorizer/MockAclMutator.java  |  62 +++
 .../authorizer/StandardAuthorizerTest.java         |   4 +-
 6 files changed, 459 insertions(+), 247 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
similarity index 61%
rename from core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
rename to core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 9fde3cbadb2..c39b785e38a 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -19,9 +19,10 @@ package kafka.security.authorizer
 import kafka.Kafka
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
 import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.zk.ZkAclStore
 import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
+import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
 import org.apache.kafka.common.acl._
@@ -32,24 +33,34 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
+import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
+import org.apache.kafka.metadata.authorizer.{MockAclMutator, StandardAuthorizer}
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
 import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util
 import java.util.concurrent.{Executors, Semaphore, TimeUnit}
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
-class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
+class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
+
+  private final val PLAINTEXT = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020)
+  private final val KRAFT = "kraft"
+  private final val ZK = "zk"
+
 
   private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, READ, ALLOW)
   private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
@@ -60,66 +71,84 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
   private val clusterResource = new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL)
   private val wildcardPrincipal = JSecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
 
-  private val aclAuthorizer = new AclAuthorizer
-  private val aclAuthorizer2 = new AclAuthorizer
+  private var authorizer1: Authorizer = _
+  private var authorizer2: Authorizer = _
+
+  private var _testInfo: TestInfo = _
 
   class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
     override def equals(o: scala.Any): Boolean = false
   }
 
-  override def authorizer: Authorizer = aclAuthorizer
+  override def authorizer: Authorizer = authorizer1
+
+  def testInfo: TestInfo = _testInfo
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
+    _testInfo = testInfo
 
-    // Increase maxUpdateRetries to avoid transient failures
-    aclAuthorizer.maxUpdateRetries = Int.MaxValue
-    aclAuthorizer2.maxUpdateRetries = Int.MaxValue
-
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
-    props.put(AclAuthorizer.SuperUsersProp, superUsers)
-
+    val props = properties
     config = KafkaConfig.fromProps(props)
-    aclAuthorizer.configure(config.originals)
-    aclAuthorizer2.configure(config.originals)
+    authorizer1 = createAuthorizer(config.originals)
+    authorizer2 = createAuthorizer(config.originals)
     resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
 
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
+    if (!TestInfoUtils.isKRaft(testInfo)) {
+      zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
+        Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
+      // Increase maxUpdateRetries to avoid transient failures
+      authorizer1.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue
+      authorizer2.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue
+    }
+  }
+
+  def properties: Properties = {
+    val props = TestUtils.createBrokerConfig(0, zkConnectOrNull)
+    props.put(AclAuthorizer.SuperUsersProp, superUsers)
+    props
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    aclAuthorizer.close()
-    aclAuthorizer2.close()
-    zooKeeperClient.close()
+    authorizer1.close()
+    authorizer2.close()
+    TestUtils.clearYammerMetrics()
+    if (!TestInfoUtils.isKRaft(_testInfo)) {
+      zooKeeperClient.close()
+    }
     super.tearDown()
   }
 
-  @Test
-  def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => authorize(aclAuthorizer, requestContext, READ,
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAuthorizeThrowsOnNonLiteralResource(quorum: String): Unit = {
+    assertThrows(classOf[IllegalArgumentException], () => authorize(authorizer1, requestContext, READ,
       new ResourcePattern(TOPIC, "something", PREFIXED)))
   }
 
-  @Test
-  def testAuthorizeWithEmptyResourceName(): Unit = {
-    assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
-    addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL))
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAuthorizeWithEmptyResourceName(quorum: String): Unit = {
+    assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
+    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL))
+    assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
   }
 
   // Authorizing the empty resource is not supported because we create a znode with the resource name.
-  @Test
-  def testEmptyAclThrowsException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testEmptyAclThrowsException(quorum: String): Unit = {
     val e = assertThrows(classOf[ApiException],
-      () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL)))
-    assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e")
+      () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL)))
+    if (quorum.equals(ZK))
+      assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e")
   }
 
-  @Test
-  def testTopicAcl(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testTopicAcl(quorum: String): Unit = {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
     val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
@@ -152,29 +181,30 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val host1Context = newRequestContext(user1, host1)
     val host2Context = newRequestContext(user1, host2)
 
-    assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2")
-    assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
-    assertTrue(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should have WRITE access from host1")
-    assertFalse(authorize(aclAuthorizer, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
-    assertTrue(authorize(aclAuthorizer, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1")
-    assertTrue(authorize(aclAuthorizer, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2")
-    assertFalse(authorize(aclAuthorizer, host1Context, ALTER, resource), "User1 should not have edit access from host1")
-    assertFalse(authorize(aclAuthorizer, host2Context, ALTER, resource), "User1 should not have edit access from host2")
+    assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2")
+    assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
+    assertTrue(authorize(authorizer1, host1Context, WRITE, resource), "User1 should have WRITE access from host1")
+    assertFalse(authorize(authorizer1, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
+    assertTrue(authorize(authorizer1, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1")
+    assertTrue(authorize(authorizer1, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2")
+    assertFalse(authorize(authorizer1, host1Context, ALTER, resource), "User1 should not have edit access from host1")
+    assertFalse(authorize(authorizer1, host2Context, ALTER, resource), "User1 should not have edit access from host2")
 
-    //test if user has READ and write access they also get describe access
+    //test if user has READ or WRITE access they also get DESCRIBE access
     val user2Context = newRequestContext(user2, host1)
     val user3Context = newRequestContext(user3, host1)
-    assertTrue(authorize(aclAuthorizer, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1")
-    assertTrue(authorize(aclAuthorizer, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2")
-    assertTrue(authorize(aclAuthorizer, user2Context, READ, resource), "User2 should have READ access from host1")
-    assertTrue(authorize(aclAuthorizer, user3Context, WRITE, resource), "User3 should have WRITE access from host2")
+    assertTrue(authorize(authorizer1, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1")
+    assertTrue(authorize(authorizer1, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2")
+    assertTrue(authorize(authorizer1, user2Context, READ, resource), "User2 should have READ access from host1")
+    assertTrue(authorize(authorizer1, user3Context, WRITE, resource), "User3 should have WRITE access from host2")
   }
 
   /**
     CustomPrincipals should be compared with their principal type and name
    */
-  @Test
-  def testAllowAccessWithCustomPrincipal(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAllowAccessWithCustomPrincipal(quorum: String): Unit = {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
     val host1 = InetAddress.getByName("192.168.1.1")
@@ -189,12 +219,13 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val host1Context = newRequestContext(customUserPrincipal, host1)
     val host2Context = newRequestContext(customUserPrincipal, host2)
 
-    assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2")
-    assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
+    assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2")
+    assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
   }
 
-  @Test
-  def testDenyTakesPrecedence(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testDenyTakesPrecedence(quorum: String): Unit = {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val host = InetAddress.getByName("192.168.2.1")
     val session = newRequestContext(user, host)
@@ -205,21 +236,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
 
     changeAclAndVerify(Set.empty, acls, Set.empty)
 
-    assertFalse(authorize(aclAuthorizer, session, READ, resource), "deny should take precedence over allow.")
+    assertFalse(authorize(authorizer1, session, READ, resource), "deny should take precedence over allow.")
   }
 
-  @Test
-  def testAllowAllAccess(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAllowAllAccess(quorum: String): Unit = {
     val allowAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, ALLOW)
 
     changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty)
 
     val context = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
-    assertTrue(authorize(aclAuthorizer, context, READ, resource), "allow all acl should allow access to all.")
+    assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl should allow access to all.")
   }
 
-  @Test
-  def testSuperUserHasAccess(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testSuperUserHasAccess(quorum: String): Unit = {
     val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY)
 
     changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
@@ -227,26 +260,28 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val session1 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
     val session2 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
 
-    assertTrue(authorize(aclAuthorizer, session1, READ, resource), "superuser always has access, no matter what acls.")
-    assertTrue(authorize(aclAuthorizer, session2, READ, resource), "superuser always has access, no matter what acls.")
+    assertTrue(authorize(authorizer1, session1, READ, resource), "superuser always has access, no matter what acls.")
+    assertTrue(authorize(authorizer1, session2, READ, resource), "superuser always has access, no matter what acls.")
   }
 
   /**
     CustomPrincipals should be compared with their principal type and name
    */
-  @Test
-  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testSuperUserWithCustomPrincipalHasAccess(quorum: String): Unit = {
     val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY)
     changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
 
     val session = newRequestContext(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
 
-    assertTrue(authorize(aclAuthorizer, session, READ, resource), "superuser with custom principal always has access, no matter what acls.")
+    assertTrue(authorize(authorizer1, session, READ, resource), "superuser with custom principal always has access, no matter what acls.")
   }
 
-  @Test
-  def testWildCardAcls(): Unit = {
-    assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should fail close.")
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testWildCardAcls(quorum: String): Unit = {
+    assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should fail close.")
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val host1 = InetAddress.getByName("192.168.3.1")
@@ -255,7 +290,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val acls = changeAclAndVerify(Set.empty, Set(readAcl), Set.empty, wildCardResource)
 
     val host1Context = newRequestContext(user1, host1)
-    assertTrue(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should have READ access from host1")
+    assertTrue(authorize(authorizer1, host1Context, READ, resource), "User1 should have READ access from host1")
 
     //allow WRITE to specific topic.
     val writeAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, ALLOW)
@@ -265,23 +300,25 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val denyWriteOnWildCardResourceAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, DENY)
     changeAclAndVerify(acls, Set(denyWriteOnWildCardResourceAcl), Set.empty, wildCardResource)
 
-    assertFalse(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should not have WRITE access from host1")
+    assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1 should not have WRITE access from host1")
   }
 
-  @Test
-  def testNoAclFound(): Unit = {
-    assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should deny op.")
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testNoAclFound(quorum: String): Unit = {
+    assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should deny op.")
   }
 
-  @Test
-  def testNoAclFoundOverride(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testNoAclFoundOverride(quorum: String): Unit = {
+    val props = properties
     props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
 
     val cfg = KafkaConfig.fromProps(props)
-    val testAuthorizer = new AclAuthorizer
+    var testAuthorizer: Authorizer = null
     try {
-      testAuthorizer.configure(cfg.originals)
+      testAuthorizer = createAuthorizer(cfg.originals)
       assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
         "when acls = null or [],  authorizer should allow op with allow.everyone = true.")
     } finally {
@@ -289,8 +326,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     }
   }
 
-  @Test
-  def testAclManagementAPIs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAclManagementAPIs(quorum: String): Unit = {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
     val host1 = "host1"
@@ -308,9 +346,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     acls = changeAclAndVerify(acls, Set(acl5), Set.empty)
 
     //test get by principal name.
-    TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user1),
+    TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user1),
       "changes not propagated in timeout period")
-    TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user2),
+    TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user2),
       "changes not propagated in timeout period")
 
     val resourceToAcls = Map[ResourcePattern, Set[AccessControlEntry]](
@@ -324,20 +362,24 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val expectedAcls = (resourceToAcls + (resource -> acls)).flatMap {
       case (res, resAcls) => resAcls.map { acl => new AclBinding(res, acl) }
     }.toSet
-    TestUtils.waitUntilTrue(() => expectedAcls == getAcls(aclAuthorizer), "changes not propagated in timeout period.")
+    TestUtils.waitUntilTrue(() => expectedAcls == getAcls(authorizer1), "changes not propagated in timeout period.")
 
     //test remove acl from existing acls.
     acls = changeAclAndVerify(acls, Set.empty, Set(acl1, acl5))
 
     //test remove all acls for resource
-    removeAcls(aclAuthorizer, Set.empty, resource)
-    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource)
-    assertFalse(zkClient.resourceExists(resource))
+    removeAcls(authorizer1, Set.empty, resource)
+    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
+    if (quorum.equals(ZK)) {
+      assertFalse(zkClient.resourceExists(resource))
+    }
 
     //test removing last acl also deletes ZooKeeper path
     acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
     changeAclAndVerify(acls, Set.empty, acls)
-    assertFalse(zkClient.resourceExists(resource))
+    if (quorum.equals(ZK)) {
+      assertFalse(zkClient.resourceExists(resource))
+    }
   }
 
   @Test
@@ -345,18 +387,18 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val acl1 = new AccessControlEntry(user1.toString, "host-1", READ, ALLOW)
     val acls = Set(acl1)
-    addAcls(aclAuthorizer, acls, resource)
+    addAcls(authorizer1, acls, resource)
 
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
     val resource1 = new ResourcePattern(TOPIC, "test-2", LITERAL)
     val acl2 = new AccessControlEntry(user2.toString, "host3", READ, DENY)
     val acls1 = Set(acl2)
-    addAcls(aclAuthorizer, acls1, resource1)
+    addAcls(authorizer1, acls1, resource1)
 
     zkClient.deleteAclChangeNotifications()
-    val authorizer = new AclAuthorizer
+    var authorizer: Authorizer = null
     try {
-      authorizer.configure(config.originals)
+      authorizer = createAclAuthorizer(config.originals)
 
       assertEquals(acls, getAcls(authorizer, resource))
       assertEquals(acls1, getAcls(authorizer, resource1))
@@ -387,7 +429,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
       configureSemaphore.acquire()
       val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
       val acls = Set(new AccessControlEntry(user1.toString, "host-1", READ, DENY))
-      addAcls(aclAuthorizer, acls, resource)
+      addAcls(authorizer1, acls, resource)
 
       listenerSemaphore.release()
       future.get(10, TimeUnit.SECONDS)
@@ -409,10 +451,10 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
     val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY)
 
-    addAcls(aclAuthorizer, Set(acl1), commonResource)
-    addAcls(aclAuthorizer, Set(acl2), commonResource)
+    addAcls(authorizer1, Set(acl1), commonResource)
+    addAcls(authorizer1, Set(acl2), commonResource)
 
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
+    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
   }
 
   @Test
@@ -426,23 +468,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY)
 
     // Add on each instance
-    addAcls(aclAuthorizer, Set(acl1), commonResource)
-    addAcls(aclAuthorizer2, Set(acl2), commonResource)
+    addAcls(authorizer1, Set(acl1), commonResource)
+    addAcls(authorizer2, Set(acl2), commonResource)
 
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource)
+    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
+    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource)
 
     val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
     val acl3 = new AccessControlEntry(user3.toString, WildcardHost, READ, DENY)
 
     // Add on one instance and delete on another
-    addAcls(aclAuthorizer, Set(acl3), commonResource)
-    val deleted = removeAcls(aclAuthorizer2, Set(acl3), commonResource)
+    addAcls(authorizer1, Set(acl3), commonResource)
+    val deleted = removeAcls(authorizer2, Set(acl3), commonResource)
 
     assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
 
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource)
+    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
+    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource)
   }
 
   @Test
@@ -458,12 +500,12 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val concurrentFuctions = acls.map { case (acl, aclId) =>
       () => {
         if (aclId % 2 == 0) {
-          addAcls(aclAuthorizer, Set(acl), commonResource)
+          addAcls(authorizer1, Set(acl), commonResource)
         } else {
-          addAcls(aclAuthorizer2, Set(acl), commonResource)
+          addAcls(authorizer2, Set(acl), commonResource)
         }
         if (aclId % 10 == 0) {
-          removeAcls(aclAuthorizer2, Set(acl), commonResource)
+          removeAcls(authorizer2, Set(acl), commonResource)
         }
       }
     }
@@ -474,15 +516,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
 
     TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
 
-    TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer2, commonResource)
+    TestUtils.waitAndVerifyAcls(expectedAcls, authorizer1, commonResource)
+    TestUtils.waitAndVerifyAcls(expectedAcls, authorizer2, commonResource)
   }
 
   /**
     * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
     */
-  @Test
-  def testAclInheritance(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAclInheritance(quorum: String): Unit = {
     testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
       CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS))
     testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
@@ -501,15 +544,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val host = InetAddress.getByName("192.168.3.1")
     val hostContext = newRequestContext(user, host)
     val acl = new AccessControlEntry(user.toString, WildcardHost, parentOp, ALLOW)
-    addAcls(aclAuthorizer, Set(acl), clusterResource)
+    addAcls(authorizer1, Set(acl), clusterResource)
     AclOperation.values.filter(validOp).foreach { op =>
-      val authorized = authorize(aclAuthorizer, hostContext, op, clusterResource)
+      val authorized = authorize(authorizer1, hostContext, op, clusterResource)
       if (allowedOps.contains(op) || op == parentOp)
         assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
       else
         assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
     }
-    removeAcls(aclAuthorizer, Set(acl), clusterResource)
+    removeAcls(authorizer1, Set(acl), clusterResource)
   }
 
   private def testImplicationsOfDeny(parentOp: AclOperation, deniedOps: Set[AclOperation]): Unit = {
@@ -518,15 +561,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val host1Context = newRequestContext(user1, host1)
     val acls = Set(new AccessControlEntry(user1.toString, WildcardHost, parentOp, DENY),
       new AccessControlEntry(user1.toString, WildcardHost, AclOperation.ALL, ALLOW))
-    addAcls(aclAuthorizer, acls, clusterResource)
+    addAcls(authorizer1, acls, clusterResource)
     AclOperation.values.filter(validOp).foreach { op =>
-      val authorized = authorize(aclAuthorizer, host1Context, op, clusterResource)
+      val authorized = authorize(authorizer1, host1Context, op, clusterResource)
       if (deniedOps.contains(op) || op == parentOp)
         assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
       else
         assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
     }
-    removeAcls(aclAuthorizer, acls, clusterResource)
+    removeAcls(authorizer1, acls, clusterResource)
   }
 
   @Test
@@ -536,151 +579,164 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     // Alternate authorizer to keep adding and removing ZooKeeper path
     val concurrentFuctions = (0 to 50).map { _ =>
       () => {
-        addAcls(aclAuthorizer, Set(acl), resource)
-        removeAcls(aclAuthorizer2, Set(acl), resource)
+        addAcls(authorizer1, Set(acl), resource)
+        removeAcls(authorizer2, Set(acl), resource)
       }
     }
 
     TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
 
-    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource)
-    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer2, resource)
+    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
+    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer2, resource)
   }
 
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAccessAllowedIfAllowAclExistsOnWildcardResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
 
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+    assertTrue(authorize(authorizer1, requestContext, READ, resource))
   }
 
-  @Test
-  def testDeleteAclOnWildcardResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testDeleteAclOnWildcardResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
 
-    removeAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+    removeAcls(authorizer1, Set(allowReadAcl), wildCardResource)
 
-    assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, wildCardResource))
+    assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource))
   }
 
-  @Test
-  def testDeleteAllAclOnWildcardResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testDeleteAllAclOnWildcardResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
 
-    removeAcls(aclAuthorizer, Set.empty, wildCardResource)
+    removeAcls(authorizer1, Set.empty, wildCardResource)
 
-    assertEquals(Set.empty, getAcls(aclAuthorizer))
+    assertEquals(Set.empty, getAcls(authorizer1))
   }
 
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAccessAllowedIfAllowAclExistsOnPrefixedResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
 
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+    assertTrue(authorize(authorizer1, requestContext, READ, resource))
   }
 
-  @Test
-  def testDeleteAclOnPrefixedResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testDeleteAclOnPrefixedResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
 
-    removeAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
+    removeAcls(authorizer1, Set(allowReadAcl), prefixedResource)
 
-    assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, prefixedResource))
+    assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource))
   }
 
-  @Test
-  def testDeleteAllAclOnPrefixedResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testDeleteAllAclOnPrefixedResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
 
-    removeAcls(aclAuthorizer, Set.empty, prefixedResource)
+    removeAcls(authorizer1, Set.empty, prefixedResource)
 
-    assertEquals(Set.empty, getAcls(aclAuthorizer))
+    assertEquals(Set.empty, getAcls(authorizer1))
   }
 
-  @Test
-  def testAddAclsOnLiteralResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), resource)
-    addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), resource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAddAclsOnLiteralResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource)
+    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource)
 
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, resource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource))
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, resource))
+    assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
+    assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
   }
 
-  @Test
-  def testAddAclsOnWildcardResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource)
-    addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), wildCardResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAddAclsOnWildcardResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
+    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource)
 
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, wildCardResource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, resource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource))
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, wildCardResource))
+    assertEquals(Set.empty, getAcls(authorizer1, resource))
+    assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
   }
 
-  @Test
-  def testAddAclsOnPrefixedResource(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-    addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), prefixedResource)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAddAclsOnPrefixedResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource)
 
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, prefixedResource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource))
-    assertEquals(Set.empty, getAcls(aclAuthorizer, resource))
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, prefixedResource))
+    assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
+    assertEquals(Set.empty, getAcls(authorizer1, resource))
   }
 
-  @Test
-  def testAuthorizeWithPrefixedResource(): Unit = {
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))
-    addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL))
-
-    addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
-
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAuthorizeWithPrefixedResource(quorum: String): Unit = {
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))
+    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL))
+
+    addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
+
+    assertTrue(authorize(authorizer1, requestContext, READ, resource))
   }
 
-  @Test
-  def testSingleCharacterResourceAcls(): Unit = {
-    addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL))
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL)))
-    assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL)))
-
-    addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED))
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL)))
-    assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL)))
-    assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL)))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testSingleCharacterResourceAcls(quorum: String): Unit = {
+    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL))
+    assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL)))
+    assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL)))
+
+    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED))
+    assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL)))
+    assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL)))
+    assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL)))
   }
 
-  @Test
-  def testGetAclsPrincipal(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testGetAclsPrincipal(quorum: String): Unit = {
     val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString, WildcardHost, WRITE, ALLOW)
-    addAcls(aclAuthorizer, Set(aclOnSpecificPrincipal), resource)
+    addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource)
 
     assertEquals(0,
-      getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request")
+      getAcls(authorizer1, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request")
     assertEquals(1,
-      getAcls(aclAuthorizer, principal).size, "acl on specific should be returned for specific request")
+      getAcls(authorizer1, principal).size, "acl on specific should be returned for specific request")
     assertEquals(1,
-      getAcls(aclAuthorizer, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance")
+      getAcls(authorizer1, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance")
 
-    removeAcls(aclAuthorizer, Set.empty, resource)
+    removeAcls(authorizer1, Set.empty, resource)
     val aclOnWildcardPrincipal = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
-    addAcls(aclAuthorizer, Set(aclOnWildcardPrincipal), resource)
+    addAcls(authorizer1, Set(aclOnWildcardPrincipal), resource)
 
-    assertEquals(1, getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request")
-    assertEquals(0, getAcls(aclAuthorizer, principal).size, "acl on wildcard should not be returned for specific request")
+    assertEquals(1, getAcls(authorizer1, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request")
+    assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard should not be returned for specific request")
   }
 
-  @Test
-  def testAclsFilter(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAclsFilter(quorum: String): Unit = {
     val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
     val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), LITERAL)
     val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
@@ -690,25 +746,31 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, DESCRIBE, ALLOW))
     val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, READ, ALLOW))
 
-    aclAuthorizer.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
-    assertEquals(Set(acl1, acl2, acl3, acl4), aclAuthorizer.acls(AclBindingFilter.ANY).asScala.toSet)
-    assertEquals(Set(acl1, acl2), aclAuthorizer.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
-    assertEquals(Set(acl4), aclAuthorizer.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
+    authorizer1.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
+    assertEquals(Set(acl1, acl2, acl3, acl4), authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
+    assertEquals(Set(acl1, acl2), authorizer1.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
+    assertEquals(Set(acl4), authorizer1.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
     val matchingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, resource2.name, MATCH), AccessControlEntryFilter.ANY)
-    assertEquals(Set(acl3, acl4), aclAuthorizer.acls(matchingFilter).asScala.toSet)
+    assertEquals(Set(acl3, acl4), authorizer1.acls(matchingFilter).asScala.toSet)
 
     val filters = List(matchingFilter,
       acl1.toFilter,
       new AclBindingFilter(resource2.toFilter, AccessControlEntryFilter.ANY),
       new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz", PatternType.ANY), AccessControlEntryFilter.ANY))
-    val deleteResults = aclAuthorizer.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get)
+    val deleteResults = authorizer1.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get)
     assertEquals(List.empty, deleteResults.filter(_.exception.isPresent))
     filters.indices.foreach { i =>
       assertEquals(Set.empty, deleteResults(i).aclBindingDeleteResults.asScala.toSet.filter(_.exception.isPresent))
     }
     assertEquals(Set(acl3, acl4), deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
     assertEquals(Set(acl1), deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+    if (quorum.equals(ZK)) {
+      assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+    } else {
+      // standard authorizer first finds the acls that match filters and then delete them.
+      // So filters[2] will match acl3 even though it is also matching filters[0] and will be deleted by it
+      assertEquals(Set(acl3), deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+    }
     assertEquals(Set.empty, deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
   }
 
@@ -716,14 +778,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
   def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
     givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
     val e = assertThrows(classOf[ApiException],
-      () => addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)))
+      () => addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)))
     assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException], s"Unexpected exception $e")
   }
 
   @Test
   def testCreateAclWithInvalidResourceName(): Unit = {
     assertThrows(classOf[ApiException],
-      () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL)))
+      () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL)))
   }
 
   @Test
@@ -733,7 +795,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val expected = new String(ZkAclStore(PREFIXED).changeStore
       .createChangeNode(resource).bytes, UTF_8)
 
-    addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+    addAcls(authorizer1, Set(denyReadAcl), resource)
 
     val actual = getAclChangeEventAsString(PREFIXED)
 
@@ -747,7 +809,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val expected = new String(ZkAclStore(PREFIXED).changeStore
       .createChangeNode(resource).bytes, UTF_8)
 
-    addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+    addAcls(authorizer1, Set(denyReadAcl), resource)
 
     val actual = getAclChangeEventAsString(PREFIXED)
 
@@ -761,7 +823,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val expected = new String(ZkAclStore(LITERAL).changeStore
       .createChangeNode(resource).bytes, UTF_8)
 
-    addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+    addAcls(authorizer1, Set(denyReadAcl), resource)
 
     val actual = getAclChangeEventAsString(LITERAL)
 
@@ -775,7 +837,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val expected = new String(ZkAclStore(LITERAL).changeStore
       .createChangeNode(resource).bytes, UTF_8)
 
-    addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+    addAcls(authorizer1, Set(denyReadAcl), resource)
 
     val actual = getAclChangeEventAsString(LITERAL)
 
@@ -914,14 +976,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val ace = new AccessControlEntry(principal.toString, WildcardHost, READ, ALLOW)
     val updateSemaphore = new Semaphore(1)
 
-    def createAcl(createAuthorizer: AclAuthorizer, resource: ResourcePattern): AclBinding = {
+    def createAcl(createAuthorizer: Authorizer, resource: ResourcePattern): AclBinding = {
       val acl = new AclBinding(resource, ace)
       createAuthorizer.createAcls(requestContext, Collections.singletonList(acl)).asScala
         .foreach(_.toCompletableFuture.get(15, TimeUnit.SECONDS))
       acl
     }
 
-    def deleteAcl(deleteAuthorizer: AclAuthorizer,
+    def deleteAcl(deleteAuthorizer: Authorizer,
                   resource: ResourcePattern,
                   deletePatternType: PatternType): List[AclBinding] = {
 
@@ -935,16 +997,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
         .toList
     }
 
-    def listAcls(authorizer: AclAuthorizer): List[AclBinding] = {
+    def listAcls(authorizer: Authorizer): List[AclBinding] = {
       authorizer.acls(AclBindingFilter.ANY).asScala.toList
     }
 
-    def verifyCreateDeleteAcl(deleteAuthorizer: AclAuthorizer,
+    def verifyCreateDeleteAcl(deleteAuthorizer: Authorizer,
                               resource: ResourcePattern,
                               deletePatternType: PatternType): Unit = {
       updateSemaphore.acquire()
       assertEquals(List.empty, listAcls(deleteAuthorizer))
-      val acl = createAcl(aclAuthorizer, resource)
+      val acl = createAcl(authorizer1, resource)
       val deleted = deleteAcl(deleteAuthorizer, resource, deletePatternType)
       if (deletePatternType != PatternType.MATCH) {
         assertEquals(List(acl), deleted)
@@ -982,34 +1044,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     }
   }
 
-  @Test
-  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAuthorizeByResourceTypeNoAclFoundOverride(quorum: String): Unit = {
+    val props = properties
     props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
 
     val cfg = KafkaConfig.fromProps(props)
-    val aclAuthorizer = new AclAuthorizer
+    var authorizer: Authorizer = null
     try {
-      aclAuthorizer.configure(cfg.originals)
-      assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, READ, resource.resourceType()),
+      authorizer = createAuthorizer(cfg.originals)
+      assertTrue(authorizeByResourceType(authorizer, requestContext, READ, resource.resourceType()),
         "If allow.everyone.if.no.acl.found = true, caller should have read access to at least one topic")
-      assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, WRITE, resource.resourceType()),
+      assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE, resource.resourceType()),
         "If allow.everyone.if.no.acl.found = true, caller should have write access to at least one topic")
     } finally {
-      aclAuthorizer.close()
+      authorizer.close()
     }
   }
 
   private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[MetadataVersion]): Unit = {
-    aclAuthorizer.close()
+    authorizer1.close()
 
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    val props = TestUtils.createBrokerConfig(0, zkConnectOrNull)
     props.put(AclAuthorizer.SuperUsersProp, superUsers)
     protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
 
     config = KafkaConfig.fromProps(props)
 
-    aclAuthorizer.configure(config.originals)
+    authorizer1.configure(config.originals)
   }
 
   private def getAclChangeEventAsString(patternType: PatternType) = {
@@ -1031,37 +1094,37 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     var acls = originalAcls
 
     if(addedAcls.nonEmpty) {
-      addAcls(aclAuthorizer, addedAcls, resource)
+      addAcls(authorizer1, addedAcls, resource)
       acls ++= addedAcls
     }
 
     if(removedAcls.nonEmpty) {
-      removeAcls(aclAuthorizer, removedAcls, resource)
+      removeAcls(authorizer1, removedAcls, resource)
       acls --=removedAcls
     }
 
-    TestUtils.waitAndVerifyAcls(acls, aclAuthorizer, resource)
+    TestUtils.waitAndVerifyAcls(acls, authorizer1, resource)
 
     acls
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
     val action = new Action(operation, resource, 1, true, true)
     authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED
   }
 
-  private def getAcls(authorizer: AclAuthorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = {
+  private def getAcls(authorizer: Authorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = {
     val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet
     acls.map(_.entry)
   }
 
-  private def getAcls(authorizer: AclAuthorizer, principal: KafkaPrincipal): Set[AclBinding] = {
+  private def getAcls(authorizer: Authorizer, principal: KafkaPrincipal): Set[AclBinding] = {
     val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
       new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY))
     authorizer.acls(filter).asScala.toSet
   }
 
-  private def getAcls(authorizer: AclAuthorizer): Set[AclBinding] = {
+  private def getAcls(authorizer: Authorizer): Set[AclBinding] = {
     authorizer.acls(AclBindingFilter.ANY).asScala.toSet
   }
 
@@ -1084,4 +1147,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
       file.getAbsolutePath
     } finally writer.close()
   }
+
+  def createAuthorizer(configs: util.Map[String, AnyRef]): Authorizer = {
+    var testAuthorizer: Authorizer = null
+    if (TestInfoUtils.isKRaft(_testInfo)) {
+      testAuthorizer = createStandardAuthorizer(configs)
+    } else {
+      testAuthorizer = createAclAuthorizer(configs)
+    }
+    testAuthorizer
+  }
+
+  def createAclAuthorizer(configs: util.Map[String, AnyRef]): AclAuthorizer = {
+    val authorizer = new AclAuthorizer
+    authorizer.configure(configs)
+    authorizer
+  }
+
+  def createStandardAuthorizer(configs: util.Map[String, AnyRef]): StandardAuthorizer = {
+    val standardAuthorizer = new StandardAuthorizer
+    standardAuthorizer.configure(configs)
+    initializeStandardAuthorizer(standardAuthorizer, new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)))
+    standardAuthorizer
+  }
+
+  def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
+                                   serverInfo: AuthorizerServerInfo): Unit = {
+    val aclMutator = new MockAclMutator(standardAuthorizer)
+    standardAuthorizer.start(serverInfo)
+    standardAuthorizer.setAclMutator(aclMutator)
+    standardAuthorizer.completeInitialLoad()
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index 313927fc537..7569e88ef0f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -139,6 +139,9 @@ public class AclControlManager {
                 throw new InvalidRequestException("Invalid permissionType " +
                     binding.entry().permissionType());
         }
+        if (binding.pattern().name() == null || binding.pattern().name().isEmpty()) {
+            throw new InvalidRequestException("Resource name should not be empty");
+        }
     }
 
     ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index f456ff4d772..c6d41a52da4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -259,6 +259,9 @@ public class StandardAuthorizerData {
         AuthorizableRequestContext requestContext,
         Action action
     ) {
+        if (action.resourcePattern().patternType() != LITERAL) {
+            throw new IllegalArgumentException("Only literal resources are supported. Got: " + action.resourcePattern().patternType());
+        }
         KafkaPrincipal principal = baseKafkaPrincipal(requestContext);
         final MatchingRule rule;
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
new file mode 100644
index 00000000000..16d71e8786c
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import java.util.List;
+import java.util.Optional;
+
+public class MockAclControlManager extends AclControlManager {
+    public MockAclControlManager(LogContext logContext,
+                                 Optional<ClusterMetadataAuthorizer> authorizer) {
+        super(new SnapshotRegistry(logContext), authorizer);
+    }
+
+    public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) {
+        ControllerResult<List<AclCreateResult>> createResults = createAcls(acls);
+        createResults.records().forEach(record -> replay((AccessControlEntryRecord) record.message(), Optional.empty()));
+        return createResults.response();
+    }
+
+    public List<AclDeleteResult> deleteAndReplayAcls(List<AclBindingFilter> filters) {
+        ControllerResult<List<AclDeleteResult>> deleteResults = deleteAcls(filters);
+        deleteResults.records().forEach(record -> replay((RemoveAccessControlEntryRecord) record.message(), Optional.empty()));
+        return deleteResults.response();
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
new file mode 100644
index 00000000000..188a8dc69cc
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.ControllerRequestContext;
+import org.apache.kafka.controller.MockAclControlManager;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+public class MockAclMutator implements AclMutator {
+    MockAclControlManager aclControlManager;
+
+    public MockAclMutator(StandardAuthorizer authorizer) {
+        aclControlManager = createAclControlManager(authorizer);
+    }
+
+    private MockAclControlManager createAclControlManager(StandardAuthorizer standardAuthorizer) {
+        LogContext logContext = new LogContext();
+        return new MockAclControlManager(logContext, Optional.of(standardAuthorizer));
+    }
+
+    @Override
+    public CompletableFuture<List<AclCreateResult>> createAcls(
+        ControllerRequestContext context,
+        List<AclBinding> aclBindings
+    ) {
+        CompletableFuture<List<AclCreateResult>> future = new CompletableFuture<>();
+        future.complete(aclControlManager.createAndReplayAcls(aclBindings));
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<List<AclDeleteResult>> deleteAcls(
+        ControllerRequestContext context,
+        List<AclBindingFilter> aclBindingFilters
+    ) {
+        CompletableFuture<List<AclDeleteResult>> future = new CompletableFuture<>();
+        future.complete(aclControlManager.deleteAndReplayAcls(aclBindingFilters));
+        return future;
+    }
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index b11b528fb9f..8ed33e2e361 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -104,10 +104,10 @@ public class StandardAuthorizerTest {
         "127.0.0.1",
         9020);
 
-    static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
+    public static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
         private final Collection<Endpoint> endpoints;
 
-        AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
+        public AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
             assertFalse(endpoints.isEmpty());
             this.endpoints = endpoints;
         }