You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2023/02/21 13:56:15 UTC
[kafka] branch 3.4 updated: KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new b7a8fd7bfe6 KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
b7a8fd7bfe6 is described below
commit b7a8fd7bfe6c1eaa6760cdb881b14475f25b80f7
Author: Purshotam Chauhan <pc...@confluent.io>
AuthorDate: Tue Feb 21 19:21:15 2023 +0530
KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
Added the following checks -
* In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed.
* In AclControlManager.addAcl() to fail if Resource Name is null or empty.
Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes -
* Rename AclAuthorizerTest -> AuthorizerTest
* Parameterize relevant tests to run for both modes
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
...clAuthorizerTest.scala => AuthorizerTest.scala} | 584 ++++++++++++---------
.../apache/kafka/controller/AclControlManager.java | 3 +
.../authorizer/StandardAuthorizerData.java | 3 +
.../kafka/controller/MockAclControlManager.java | 50 ++
.../kafka/metadata/authorizer/MockAclMutator.java | 62 +++
.../authorizer/StandardAuthorizerTest.java | 4 +-
6 files changed, 459 insertions(+), 247 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
similarity index 61%
rename from core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
rename to core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 9fde3cbadb2..c39b785e38a 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -19,9 +19,10 @@ package kafka.security.authorizer
import kafka.Kafka
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.zk.ZkAclStore
import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
+import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.acl._
@@ -32,24 +33,34 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
+import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
+import org.apache.kafka.metadata.authorizer.{MockAclMutator, StandardAuthorizer}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
+import java.util
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
-class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
+class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
+
+ private final val PLAINTEXT = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020)
+ private final val KRAFT = "kraft"
+ private final val ZK = "zk"
+
private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, READ, ALLOW)
private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
@@ -60,66 +71,84 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
private val clusterResource = new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL)
private val wildcardPrincipal = JSecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
- private val aclAuthorizer = new AclAuthorizer
- private val aclAuthorizer2 = new AclAuthorizer
+ private var authorizer1: Authorizer = _
+ private var authorizer2: Authorizer = _
+
+ private var _testInfo: TestInfo = _
class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
override def equals(o: scala.Any): Boolean = false
}
- override def authorizer: Authorizer = aclAuthorizer
+ override def authorizer: Authorizer = authorizer1
+
+ def testInfo: TestInfo = _testInfo
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
+ _testInfo = testInfo
- // Increase maxUpdateRetries to avoid transient failures
- aclAuthorizer.maxUpdateRetries = Int.MaxValue
- aclAuthorizer2.maxUpdateRetries = Int.MaxValue
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
- props.put(AclAuthorizer.SuperUsersProp, superUsers)
-
+ val props = properties
config = KafkaConfig.fromProps(props)
- aclAuthorizer.configure(config.originals)
- aclAuthorizer2.configure(config.originals)
+ authorizer1 = createAuthorizer(config.originals)
+ authorizer2 = createAuthorizer(config.originals)
resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
+ if (!TestInfoUtils.isKRaft(testInfo)) {
+ zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
+ Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
+ // Increase maxUpdateRetries to avoid transient failures
+ authorizer1.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue
+ authorizer2.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue
+ }
+ }
+
+ def properties: Properties = {
+ val props = TestUtils.createBrokerConfig(0, zkConnectOrNull)
+ props.put(AclAuthorizer.SuperUsersProp, superUsers)
+ props
}
@AfterEach
override def tearDown(): Unit = {
- aclAuthorizer.close()
- aclAuthorizer2.close()
- zooKeeperClient.close()
+ authorizer1.close()
+ authorizer2.close()
+ TestUtils.clearYammerMetrics()
+ if (!TestInfoUtils.isKRaft(_testInfo)) {
+ zooKeeperClient.close()
+ }
super.tearDown()
}
- @Test
- def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => authorize(aclAuthorizer, requestContext, READ,
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAuthorizeThrowsOnNonLiteralResource(quorum: String): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => authorize(authorizer1, requestContext, READ,
new ResourcePattern(TOPIC, "something", PREFIXED)))
}
- @Test
- def testAuthorizeWithEmptyResourceName(): Unit = {
- assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
- addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL))
- assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAuthorizeWithEmptyResourceName(quorum: String): Unit = {
+ assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
+ addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL))
+ assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
}
// Authorizing the empty resource is not supported because we create a znode with the resource name.
- @Test
- def testEmptyAclThrowsException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testEmptyAclThrowsException(quorum: String): Unit = {
val e = assertThrows(classOf[ApiException],
- () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL)))
- assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e")
+ () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL)))
+ if (quorum.equals(ZK))
+ assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e")
}
- @Test
- def testTopicAcl(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testTopicAcl(quorum: String): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
@@ -152,29 +181,30 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val host1Context = newRequestContext(user1, host1)
val host2Context = newRequestContext(user1, host2)
- assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2")
- assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
- assertTrue(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should have WRITE access from host1")
- assertFalse(authorize(aclAuthorizer, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
- assertTrue(authorize(aclAuthorizer, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1")
- assertTrue(authorize(aclAuthorizer, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2")
- assertFalse(authorize(aclAuthorizer, host1Context, ALTER, resource), "User1 should not have edit access from host1")
- assertFalse(authorize(aclAuthorizer, host2Context, ALTER, resource), "User1 should not have edit access from host2")
+ assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2")
+ assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
+ assertTrue(authorize(authorizer1, host1Context, WRITE, resource), "User1 should have WRITE access from host1")
+ assertFalse(authorize(authorizer1, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
+ assertTrue(authorize(authorizer1, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1")
+ assertTrue(authorize(authorizer1, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2")
+ assertFalse(authorize(authorizer1, host1Context, ALTER, resource), "User1 should not have edit access from host1")
+ assertFalse(authorize(authorizer1, host2Context, ALTER, resource), "User1 should not have edit access from host2")
- //test if user has READ and write access they also get describe access
+ //test if user has READ or WRITE access they also get DESCRIBE access
val user2Context = newRequestContext(user2, host1)
val user3Context = newRequestContext(user3, host1)
- assertTrue(authorize(aclAuthorizer, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1")
- assertTrue(authorize(aclAuthorizer, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2")
- assertTrue(authorize(aclAuthorizer, user2Context, READ, resource), "User2 should have READ access from host1")
- assertTrue(authorize(aclAuthorizer, user3Context, WRITE, resource), "User3 should have WRITE access from host2")
+ assertTrue(authorize(authorizer1, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1")
+ assertTrue(authorize(authorizer1, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2")
+ assertTrue(authorize(authorizer1, user2Context, READ, resource), "User2 should have READ access from host1")
+ assertTrue(authorize(authorizer1, user3Context, WRITE, resource), "User3 should have WRITE access from host2")
}
/**
CustomPrincipals should be compared with their principal type and name
*/
- @Test
- def testAllowAccessWithCustomPrincipal(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAllowAccessWithCustomPrincipal(quorum: String): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.1.1")
@@ -189,12 +219,13 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val host1Context = newRequestContext(customUserPrincipal, host1)
val host2Context = newRequestContext(customUserPrincipal, host2)
- assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2")
- assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
+ assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2")
+ assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
}
- @Test
- def testDenyTakesPrecedence(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testDenyTakesPrecedence(quorum: String): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host = InetAddress.getByName("192.168.2.1")
val session = newRequestContext(user, host)
@@ -205,21 +236,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
changeAclAndVerify(Set.empty, acls, Set.empty)
- assertFalse(authorize(aclAuthorizer, session, READ, resource), "deny should take precedence over allow.")
+ assertFalse(authorize(authorizer1, session, READ, resource), "deny should take precedence over allow.")
}
- @Test
- def testAllowAllAccess(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAllowAllAccess(quorum: String): Unit = {
val allowAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, ALLOW)
changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty)
val context = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
- assertTrue(authorize(aclAuthorizer, context, READ, resource), "allow all acl should allow access to all.")
+ assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl should allow access to all.")
}
- @Test
- def testSuperUserHasAccess(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testSuperUserHasAccess(quorum: String): Unit = {
val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY)
changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
@@ -227,26 +260,28 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val session1 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
val session2 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
- assertTrue(authorize(aclAuthorizer, session1, READ, resource), "superuser always has access, no matter what acls.")
- assertTrue(authorize(aclAuthorizer, session2, READ, resource), "superuser always has access, no matter what acls.")
+ assertTrue(authorize(authorizer1, session1, READ, resource), "superuser always has access, no matter what acls.")
+ assertTrue(authorize(authorizer1, session2, READ, resource), "superuser always has access, no matter what acls.")
}
/**
CustomPrincipals should be compared with their principal type and name
*/
- @Test
- def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testSuperUserWithCustomPrincipalHasAccess(quorum: String): Unit = {
val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY)
changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
val session = newRequestContext(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
- assertTrue(authorize(aclAuthorizer, session, READ, resource), "superuser with custom principal always has access, no matter what acls.")
+ assertTrue(authorize(authorizer1, session, READ, resource), "superuser with custom principal always has access, no matter what acls.")
}
- @Test
- def testWildCardAcls(): Unit = {
- assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should fail close.")
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testWildCardAcls(quorum: String): Unit = {
+ assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should fail close.")
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.3.1")
@@ -255,7 +290,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val acls = changeAclAndVerify(Set.empty, Set(readAcl), Set.empty, wildCardResource)
val host1Context = newRequestContext(user1, host1)
- assertTrue(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should have READ access from host1")
+ assertTrue(authorize(authorizer1, host1Context, READ, resource), "User1 should have READ access from host1")
//allow WRITE to specific topic.
val writeAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, ALLOW)
@@ -265,23 +300,25 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val denyWriteOnWildCardResourceAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, DENY)
changeAclAndVerify(acls, Set(denyWriteOnWildCardResourceAcl), Set.empty, wildCardResource)
- assertFalse(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should not have WRITE access from host1")
+ assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1 should not have WRITE access from host1")
}
- @Test
- def testNoAclFound(): Unit = {
- assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should deny op.")
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testNoAclFound(quorum: String): Unit = {
+ assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should deny op.")
}
- @Test
- def testNoAclFoundOverride(): Unit = {
- val props = TestUtils.createBrokerConfig(1, zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testNoAclFoundOverride(quorum: String): Unit = {
+ val props = properties
props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
val cfg = KafkaConfig.fromProps(props)
- val testAuthorizer = new AclAuthorizer
+ var testAuthorizer: Authorizer = null
try {
- testAuthorizer.configure(cfg.originals)
+ testAuthorizer = createAuthorizer(cfg.originals)
assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
"when acls = null or [], authorizer should allow op with allow.everyone = true.")
} finally {
@@ -289,8 +326,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}
- @Test
- def testAclManagementAPIs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAclManagementAPIs(quorum: String): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val host1 = "host1"
@@ -308,9 +346,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
acls = changeAclAndVerify(acls, Set(acl5), Set.empty)
//test get by principal name.
- TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user1),
+ TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user1),
"changes not propagated in timeout period")
- TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user2),
+ TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user2),
"changes not propagated in timeout period")
val resourceToAcls = Map[ResourcePattern, Set[AccessControlEntry]](
@@ -324,20 +362,24 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val expectedAcls = (resourceToAcls + (resource -> acls)).flatMap {
case (res, resAcls) => resAcls.map { acl => new AclBinding(res, acl) }
}.toSet
- TestUtils.waitUntilTrue(() => expectedAcls == getAcls(aclAuthorizer), "changes not propagated in timeout period.")
+ TestUtils.waitUntilTrue(() => expectedAcls == getAcls(authorizer1), "changes not propagated in timeout period.")
//test remove acl from existing acls.
acls = changeAclAndVerify(acls, Set.empty, Set(acl1, acl5))
//test remove all acls for resource
- removeAcls(aclAuthorizer, Set.empty, resource)
- TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource)
- assertFalse(zkClient.resourceExists(resource))
+ removeAcls(authorizer1, Set.empty, resource)
+ TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
+ if (quorum.equals(ZK)) {
+ assertFalse(zkClient.resourceExists(resource))
+ }
//test removing last acl also deletes ZooKeeper path
acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
changeAclAndVerify(acls, Set.empty, acls)
- assertFalse(zkClient.resourceExists(resource))
+ if (quorum.equals(ZK)) {
+ assertFalse(zkClient.resourceExists(resource))
+ }
}
@Test
@@ -345,18 +387,18 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new AccessControlEntry(user1.toString, "host-1", READ, ALLOW)
val acls = Set(acl1)
- addAcls(aclAuthorizer, acls, resource)
+ addAcls(authorizer1, acls, resource)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val resource1 = new ResourcePattern(TOPIC, "test-2", LITERAL)
val acl2 = new AccessControlEntry(user2.toString, "host3", READ, DENY)
val acls1 = Set(acl2)
- addAcls(aclAuthorizer, acls1, resource1)
+ addAcls(authorizer1, acls1, resource1)
zkClient.deleteAclChangeNotifications()
- val authorizer = new AclAuthorizer
+ var authorizer: Authorizer = null
try {
- authorizer.configure(config.originals)
+ authorizer = createAclAuthorizer(config.originals)
assertEquals(acls, getAcls(authorizer, resource))
assertEquals(acls1, getAcls(authorizer, resource1))
@@ -387,7 +429,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
configureSemaphore.acquire()
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acls = Set(new AccessControlEntry(user1.toString, "host-1", READ, DENY))
- addAcls(aclAuthorizer, acls, resource)
+ addAcls(authorizer1, acls, resource)
listenerSemaphore.release()
future.get(10, TimeUnit.SECONDS)
@@ -409,10 +451,10 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY)
- addAcls(aclAuthorizer, Set(acl1), commonResource)
- addAcls(aclAuthorizer, Set(acl2), commonResource)
+ addAcls(authorizer1, Set(acl1), commonResource)
+ addAcls(authorizer1, Set(acl2), commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
}
@Test
@@ -426,23 +468,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY)
// Add on each instance
- addAcls(aclAuthorizer, Set(acl1), commonResource)
- addAcls(aclAuthorizer2, Set(acl2), commonResource)
+ addAcls(authorizer1, Set(acl1), commonResource)
+ addAcls(authorizer2, Set(acl2), commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource)
val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
val acl3 = new AccessControlEntry(user3.toString, WildcardHost, READ, DENY)
// Add on one instance and delete on another
- addAcls(aclAuthorizer, Set(acl3), commonResource)
- val deleted = removeAcls(aclAuthorizer2, Set(acl3), commonResource)
+ addAcls(authorizer1, Set(acl3), commonResource)
+ val deleted = removeAcls(authorizer2, Set(acl3), commonResource)
assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource)
}
@Test
@@ -458,12 +500,12 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val concurrentFuctions = acls.map { case (acl, aclId) =>
() => {
if (aclId % 2 == 0) {
- addAcls(aclAuthorizer, Set(acl), commonResource)
+ addAcls(authorizer1, Set(acl), commonResource)
} else {
- addAcls(aclAuthorizer2, Set(acl), commonResource)
+ addAcls(authorizer2, Set(acl), commonResource)
}
if (aclId % 10 == 0) {
- removeAcls(aclAuthorizer2, Set(acl), commonResource)
+ removeAcls(authorizer2, Set(acl), commonResource)
}
}
}
@@ -474,15 +516,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
- TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer2, commonResource)
+ TestUtils.waitAndVerifyAcls(expectedAcls, authorizer1, commonResource)
+ TestUtils.waitAndVerifyAcls(expectedAcls, authorizer2, commonResource)
}
/**
* Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
*/
- @Test
- def testAclInheritance(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAclInheritance(quorum: String): Unit = {
testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS))
testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
@@ -501,15 +544,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val host = InetAddress.getByName("192.168.3.1")
val hostContext = newRequestContext(user, host)
val acl = new AccessControlEntry(user.toString, WildcardHost, parentOp, ALLOW)
- addAcls(aclAuthorizer, Set(acl), clusterResource)
+ addAcls(authorizer1, Set(acl), clusterResource)
AclOperation.values.filter(validOp).foreach { op =>
- val authorized = authorize(aclAuthorizer, hostContext, op, clusterResource)
+ val authorized = authorize(authorizer1, hostContext, op, clusterResource)
if (allowedOps.contains(op) || op == parentOp)
assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
else
assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
}
- removeAcls(aclAuthorizer, Set(acl), clusterResource)
+ removeAcls(authorizer1, Set(acl), clusterResource)
}
private def testImplicationsOfDeny(parentOp: AclOperation, deniedOps: Set[AclOperation]): Unit = {
@@ -518,15 +561,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val host1Context = newRequestContext(user1, host1)
val acls = Set(new AccessControlEntry(user1.toString, WildcardHost, parentOp, DENY),
new AccessControlEntry(user1.toString, WildcardHost, AclOperation.ALL, ALLOW))
- addAcls(aclAuthorizer, acls, clusterResource)
+ addAcls(authorizer1, acls, clusterResource)
AclOperation.values.filter(validOp).foreach { op =>
- val authorized = authorize(aclAuthorizer, host1Context, op, clusterResource)
+ val authorized = authorize(authorizer1, host1Context, op, clusterResource)
if (deniedOps.contains(op) || op == parentOp)
assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
else
assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
}
- removeAcls(aclAuthorizer, acls, clusterResource)
+ removeAcls(authorizer1, acls, clusterResource)
}
@Test
@@ -536,151 +579,164 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
// Alternate authorizer to keep adding and removing ZooKeeper path
val concurrentFuctions = (0 to 50).map { _ =>
() => {
- addAcls(aclAuthorizer, Set(acl), resource)
- removeAcls(aclAuthorizer2, Set(acl), resource)
+ addAcls(authorizer1, Set(acl), resource)
+ removeAcls(authorizer2, Set(acl), resource)
}
}
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
- TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource)
- TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer2, resource)
+ TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
+ TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer2, resource)
}
- @Test
- def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAccessAllowedIfAllowAclExistsOnWildcardResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
- assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+ assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
- @Test
- def testDeleteAclOnWildcardResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testDeleteAclOnWildcardResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
- removeAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+ removeAcls(authorizer1, Set(allowReadAcl), wildCardResource)
- assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, wildCardResource))
+ assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource))
}
- @Test
- def testDeleteAllAclOnWildcardResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testDeleteAllAclOnWildcardResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
- removeAcls(aclAuthorizer, Set.empty, wildCardResource)
+ removeAcls(authorizer1, Set.empty, wildCardResource)
- assertEquals(Set.empty, getAcls(aclAuthorizer))
+ assertEquals(Set.empty, getAcls(authorizer1))
}
- @Test
- def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAccessAllowedIfAllowAclExistsOnPrefixedResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
- assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+ assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
- @Test
- def testDeleteAclOnPrefixedResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testDeleteAclOnPrefixedResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
- removeAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
+ removeAcls(authorizer1, Set(allowReadAcl), prefixedResource)
- assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, prefixedResource))
+ assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource))
}
- @Test
- def testDeleteAllAclOnPrefixedResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testDeleteAllAclOnPrefixedResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
- removeAcls(aclAuthorizer, Set.empty, prefixedResource)
+ removeAcls(authorizer1, Set.empty, prefixedResource)
- assertEquals(Set.empty, getAcls(aclAuthorizer))
+ assertEquals(Set.empty, getAcls(authorizer1))
}
- @Test
- def testAddAclsOnLiteralResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), resource)
- addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), resource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAddAclsOnLiteralResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource)
+ addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource)
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, resource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource))
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, resource))
+ assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
+ assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
}
- @Test
- def testAddAclsOnWildcardResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource)
- addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), wildCardResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAddAclsOnWildcardResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
+ addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource)
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, wildCardResource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, resource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource))
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, wildCardResource))
+ assertEquals(Set.empty, getAcls(authorizer1, resource))
+ assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
}
- @Test
- def testAddAclsOnPrefixedResource(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource)
- addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), prefixedResource)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAddAclsOnPrefixedResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
+ addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource)
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, prefixedResource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource))
- assertEquals(Set.empty, getAcls(aclAuthorizer, resource))
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, prefixedResource))
+ assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
+ assertEquals(Set.empty, getAcls(authorizer1, resource))
}
- @Test
- def testAuthorizeWithPrefixedResource(): Unit = {
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))
- addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL))
-
- addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource)
-
- assertTrue(authorize(aclAuthorizer, requestContext, READ, resource))
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAuthorizeWithPrefixedResource(quorum: String): Unit = {
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))
+ addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL))
+
+ addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
+
+ assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
- @Test
- def testSingleCharacterResourceAcls(): Unit = {
- addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL))
- assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL)))
- assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL)))
-
- addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED))
- assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL)))
- assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL)))
- assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL)))
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testSingleCharacterResourceAcls(quorum: String): Unit = {
+ addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL))
+ assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL)))
+ assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL)))
+
+ addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED))
+ assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL)))
+ assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL)))
+ assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL)))
}
- @Test
- def testGetAclsPrincipal(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testGetAclsPrincipal(quorum: String): Unit = {
val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString, WildcardHost, WRITE, ALLOW)
- addAcls(aclAuthorizer, Set(aclOnSpecificPrincipal), resource)
+ addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource)
assertEquals(0,
- getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request")
+ getAcls(authorizer1, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request")
assertEquals(1,
- getAcls(aclAuthorizer, principal).size, "acl on specific should be returned for specific request")
+ getAcls(authorizer1, principal).size, "acl on specific should be returned for specific request")
assertEquals(1,
- getAcls(aclAuthorizer, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance")
+ getAcls(authorizer1, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance")
- removeAcls(aclAuthorizer, Set.empty, resource)
+ removeAcls(authorizer1, Set.empty, resource)
val aclOnWildcardPrincipal = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
- addAcls(aclAuthorizer, Set(aclOnWildcardPrincipal), resource)
+ addAcls(authorizer1, Set(aclOnWildcardPrincipal), resource)
- assertEquals(1, getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request")
- assertEquals(0, getAcls(aclAuthorizer, principal).size, "acl on wildcard should not be returned for specific request")
+ assertEquals(1, getAcls(authorizer1, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request")
+ assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard should not be returned for specific request")
}
- @Test
- def testAclsFilter(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAclsFilter(quorum: String): Unit = {
val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), LITERAL)
val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
@@ -690,25 +746,31 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, DESCRIBE, ALLOW))
val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, READ, ALLOW))
- aclAuthorizer.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
- assertEquals(Set(acl1, acl2, acl3, acl4), aclAuthorizer.acls(AclBindingFilter.ANY).asScala.toSet)
- assertEquals(Set(acl1, acl2), aclAuthorizer.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
- assertEquals(Set(acl4), aclAuthorizer.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
+ authorizer1.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
+ assertEquals(Set(acl1, acl2, acl3, acl4), authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
+ assertEquals(Set(acl1, acl2), authorizer1.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
+ assertEquals(Set(acl4), authorizer1.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet)
val matchingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, resource2.name, MATCH), AccessControlEntryFilter.ANY)
- assertEquals(Set(acl3, acl4), aclAuthorizer.acls(matchingFilter).asScala.toSet)
+ assertEquals(Set(acl3, acl4), authorizer1.acls(matchingFilter).asScala.toSet)
val filters = List(matchingFilter,
acl1.toFilter,
new AclBindingFilter(resource2.toFilter, AccessControlEntryFilter.ANY),
new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz", PatternType.ANY), AccessControlEntryFilter.ANY))
- val deleteResults = aclAuthorizer.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get)
+ val deleteResults = authorizer1.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get)
assertEquals(List.empty, deleteResults.filter(_.exception.isPresent))
filters.indices.foreach { i =>
assertEquals(Set.empty, deleteResults(i).aclBindingDeleteResults.asScala.toSet.filter(_.exception.isPresent))
}
assertEquals(Set(acl3, acl4), deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
assertEquals(Set(acl1), deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+ if (quorum.equals(ZK)) {
+ assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+ } else {
+ // standard authorizer first finds the acls that match filters and then delete them.
+ // So filters[2] will match acl3 even though it is also matching filters[0] and will be deleted by it
+ assertEquals(Set(acl3), deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
+ }
assertEquals(Set.empty, deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
}
@@ -716,14 +778,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
val e = assertThrows(classOf[ApiException],
- () => addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)))
+ () => addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)))
assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException], s"Unexpected exception $e")
}
@Test
def testCreateAclWithInvalidResourceName(): Unit = {
assertThrows(classOf[ApiException],
- () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL)))
+ () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL)))
}
@Test
@@ -733,7 +795,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val expected = new String(ZkAclStore(PREFIXED).changeStore
.createChangeNode(resource).bytes, UTF_8)
- addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+ addAcls(authorizer1, Set(denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
@@ -747,7 +809,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val expected = new String(ZkAclStore(PREFIXED).changeStore
.createChangeNode(resource).bytes, UTF_8)
- addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+ addAcls(authorizer1, Set(denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
@@ -761,7 +823,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val expected = new String(ZkAclStore(LITERAL).changeStore
.createChangeNode(resource).bytes, UTF_8)
- addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+ addAcls(authorizer1, Set(denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
@@ -775,7 +837,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val expected = new String(ZkAclStore(LITERAL).changeStore
.createChangeNode(resource).bytes, UTF_8)
- addAcls(aclAuthorizer, Set(denyReadAcl), resource)
+ addAcls(authorizer1, Set(denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
@@ -914,14 +976,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val ace = new AccessControlEntry(principal.toString, WildcardHost, READ, ALLOW)
val updateSemaphore = new Semaphore(1)
- def createAcl(createAuthorizer: AclAuthorizer, resource: ResourcePattern): AclBinding = {
+ def createAcl(createAuthorizer: Authorizer, resource: ResourcePattern): AclBinding = {
val acl = new AclBinding(resource, ace)
createAuthorizer.createAcls(requestContext, Collections.singletonList(acl)).asScala
.foreach(_.toCompletableFuture.get(15, TimeUnit.SECONDS))
acl
}
- def deleteAcl(deleteAuthorizer: AclAuthorizer,
+ def deleteAcl(deleteAuthorizer: Authorizer,
resource: ResourcePattern,
deletePatternType: PatternType): List[AclBinding] = {
@@ -935,16 +997,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
.toList
}
- def listAcls(authorizer: AclAuthorizer): List[AclBinding] = {
+ def listAcls(authorizer: Authorizer): List[AclBinding] = {
authorizer.acls(AclBindingFilter.ANY).asScala.toList
}
- def verifyCreateDeleteAcl(deleteAuthorizer: AclAuthorizer,
+ def verifyCreateDeleteAcl(deleteAuthorizer: Authorizer,
resource: ResourcePattern,
deletePatternType: PatternType): Unit = {
updateSemaphore.acquire()
assertEquals(List.empty, listAcls(deleteAuthorizer))
- val acl = createAcl(aclAuthorizer, resource)
+ val acl = createAcl(authorizer1, resource)
val deleted = deleteAcl(deleteAuthorizer, resource, deletePatternType)
if (deletePatternType != PatternType.MATCH) {
assertEquals(List(acl), deleted)
@@ -982,34 +1044,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}
- @Test
- def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
- val props = TestUtils.createBrokerConfig(1, zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAuthorizeByResourceTypeNoAclFoundOverride(quorum: String): Unit = {
+ val props = properties
props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
val cfg = KafkaConfig.fromProps(props)
- val aclAuthorizer = new AclAuthorizer
+ var authorizer: Authorizer = null
try {
- aclAuthorizer.configure(cfg.originals)
- assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, READ, resource.resourceType()),
+ authorizer = createAuthorizer(cfg.originals)
+ assertTrue(authorizeByResourceType(authorizer, requestContext, READ, resource.resourceType()),
"If allow.everyone.if.no.acl.found = true, caller should have read access to at least one topic")
- assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, WRITE, resource.resourceType()),
+ assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE, resource.resourceType()),
"If allow.everyone.if.no.acl.found = true, caller should have write access to at least one topic")
} finally {
- aclAuthorizer.close()
+ authorizer.close()
}
}
private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[MetadataVersion]): Unit = {
- aclAuthorizer.close()
+ authorizer1.close()
- val props = TestUtils.createBrokerConfig(0, zkConnect)
+ val props = TestUtils.createBrokerConfig(0, zkConnectOrNull)
props.put(AclAuthorizer.SuperUsersProp, superUsers)
protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
config = KafkaConfig.fromProps(props)
- aclAuthorizer.configure(config.originals)
+ authorizer1.configure(config.originals)
}
private def getAclChangeEventAsString(patternType: PatternType) = {
@@ -1031,37 +1094,37 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
var acls = originalAcls
if(addedAcls.nonEmpty) {
- addAcls(aclAuthorizer, addedAcls, resource)
+ addAcls(authorizer1, addedAcls, resource)
acls ++= addedAcls
}
if(removedAcls.nonEmpty) {
- removeAcls(aclAuthorizer, removedAcls, resource)
+ removeAcls(authorizer1, removedAcls, resource)
acls --=removedAcls
}
- TestUtils.waitAndVerifyAcls(acls, aclAuthorizer, resource)
+ TestUtils.waitAndVerifyAcls(acls, authorizer1, resource)
acls
}
- private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+ private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
val action = new Action(operation, resource, 1, true, true)
authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED
}
- private def getAcls(authorizer: AclAuthorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = {
+ private def getAcls(authorizer: Authorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = {
val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet
acls.map(_.entry)
}
- private def getAcls(authorizer: AclAuthorizer, principal: KafkaPrincipal): Set[AclBinding] = {
+ private def getAcls(authorizer: Authorizer, principal: KafkaPrincipal): Set[AclBinding] = {
val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY))
authorizer.acls(filter).asScala.toSet
}
- private def getAcls(authorizer: AclAuthorizer): Set[AclBinding] = {
+ private def getAcls(authorizer: Authorizer): Set[AclBinding] = {
authorizer.acls(AclBindingFilter.ANY).asScala.toSet
}
@@ -1084,4 +1147,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
file.getAbsolutePath
} finally writer.close()
}
+
+ def createAuthorizer(configs: util.Map[String, AnyRef]): Authorizer = {
+ var testAuthorizer: Authorizer = null
+ if (TestInfoUtils.isKRaft(_testInfo)) {
+ testAuthorizer = createStandardAuthorizer(configs)
+ } else {
+ testAuthorizer = createAclAuthorizer(configs)
+ }
+ testAuthorizer
+ }
+
+ def createAclAuthorizer(configs: util.Map[String, AnyRef]): AclAuthorizer = {
+ val authorizer = new AclAuthorizer
+ authorizer.configure(configs)
+ authorizer
+ }
+
+ def createStandardAuthorizer(configs: util.Map[String, AnyRef]): StandardAuthorizer = {
+ val standardAuthorizer = new StandardAuthorizer
+ standardAuthorizer.configure(configs)
+ initializeStandardAuthorizer(standardAuthorizer, new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)))
+ standardAuthorizer
+ }
+
+ def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
+ serverInfo: AuthorizerServerInfo): Unit = {
+ val aclMutator = new MockAclMutator(standardAuthorizer)
+ standardAuthorizer.start(serverInfo)
+ standardAuthorizer.setAclMutator(aclMutator)
+ standardAuthorizer.completeInitialLoad()
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index 313927fc537..7569e88ef0f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -139,6 +139,9 @@ public class AclControlManager {
throw new InvalidRequestException("Invalid permissionType " +
binding.entry().permissionType());
}
+ if (binding.pattern().name() == null || binding.pattern().name().isEmpty()) {
+ throw new InvalidRequestException("Resource name should not be empty");
+ }
}
ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index f456ff4d772..c6d41a52da4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -259,6 +259,9 @@ public class StandardAuthorizerData {
AuthorizableRequestContext requestContext,
Action action
) {
+ if (action.resourcePattern().patternType() != LITERAL) {
+ throw new IllegalArgumentException("Only literal resources are supported. Got: " + action.resourcePattern().patternType());
+ }
KafkaPrincipal principal = baseKafkaPrincipal(requestContext);
final MatchingRule rule;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
new file mode 100644
index 00000000000..16d71e8786c
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import java.util.List;
+import java.util.Optional;
+
+public class MockAclControlManager extends AclControlManager {
+ public MockAclControlManager(LogContext logContext,
+ Optional<ClusterMetadataAuthorizer> authorizer) {
+ super(new SnapshotRegistry(logContext), authorizer);
+ }
+
+ public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) {
+ ControllerResult<List<AclCreateResult>> createResults = createAcls(acls);
+ createResults.records().forEach(record -> replay((AccessControlEntryRecord) record.message(), Optional.empty()));
+ return createResults.response();
+ }
+
+ public List<AclDeleteResult> deleteAndReplayAcls(List<AclBindingFilter> filters) {
+ ControllerResult<List<AclDeleteResult>> deleteResults = deleteAcls(filters);
+ deleteResults.records().forEach(record -> replay((RemoveAccessControlEntryRecord) record.message(), Optional.empty()));
+ return deleteResults.response();
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
new file mode 100644
index 00000000000..188a8dc69cc
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.ControllerRequestContext;
+import org.apache.kafka.controller.MockAclControlManager;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+public class MockAclMutator implements AclMutator {
+ MockAclControlManager aclControlManager;
+
+ public MockAclMutator(StandardAuthorizer authorizer) {
+ aclControlManager = createAclControlManager(authorizer);
+ }
+
+ private MockAclControlManager createAclControlManager(StandardAuthorizer standardAuthorizer) {
+ LogContext logContext = new LogContext();
+ return new MockAclControlManager(logContext, Optional.of(standardAuthorizer));
+ }
+
+ @Override
+ public CompletableFuture<List<AclCreateResult>> createAcls(
+ ControllerRequestContext context,
+ List<AclBinding> aclBindings
+ ) {
+ CompletableFuture<List<AclCreateResult>> future = new CompletableFuture<>();
+ future.complete(aclControlManager.createAndReplayAcls(aclBindings));
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<AclDeleteResult>> deleteAcls(
+ ControllerRequestContext context,
+ List<AclBindingFilter> aclBindingFilters
+ ) {
+ CompletableFuture<List<AclDeleteResult>> future = new CompletableFuture<>();
+ future.complete(aclControlManager.deleteAndReplayAcls(aclBindingFilters));
+ return future;
+ }
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 3966054e744..d9fcff1b9b3 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -103,10 +103,10 @@ public class StandardAuthorizerTest {
"127.0.0.1",
9020);
- static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
+ public static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
private final Collection<Endpoint> endpoints;
- AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
+ public AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
assertFalse(endpoints.isEmpty());
this.endpoints = endpoints;
}