You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/12/02 15:58:35 UTC
[kafka] branch trunk updated: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b00a07ccc4 KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)
7b00a07ccc4 is described below
commit 7b00a07ccc4334d81299e5c562109fa2555ce934
Author: Proven Provenzano <93...@users.noreply.github.com>
AuthorDate: Fri Dec 2 10:58:10 2022 -0500
KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers (#12896)
* Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers
* SCRAM and Delegation are not implemented for KRAFT yet so they emit
a message to stderr and pass the test.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../DelegationTokenEndToEndAuthorizationTest.scala | 11 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 181 +++++++++++++++------
.../kafka/api/IntegrationTestHarness.scala | 5 +
.../api/PlaintextEndToEndAuthorizationTest.scala | 10 +-
.../kafka/api/SaslEndToEndAuthorizationTest.scala | 50 +++---
.../SaslGssapiSslEndToEndAuthorizationTest.scala | 4 +-
...slOAuthBearerSslEndToEndAuthorizationTest.scala | 2 +-
.../SaslPlainSslEndToEndAuthorizationTest.scala | 7 +-
.../SaslScramSslEndToEndAuthorizationTest.scala | 13 +-
9 files changed, 195 insertions(+), 88 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 40ad090b46e..920d0ccb254 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.util.Properties
import kafka.server.KafkaConfig
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, ScramCredentialInfo, UserScramCredentialAlteration, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.SaslConfigs
@@ -46,6 +46,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
protected val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+ override val unimplementedquorum = "kraft"
protected val privilegedAdminClientConfig = new Properties()
@@ -110,9 +111,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
- startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
- super.setUp(testInfo)
- privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+ if (!TestInfoUtils.isKRaft(testInfo)) {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
+ super.setUp(testInfo)
+ privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+ }
}
def assertTokenOwner(owner: KafkaPrincipal, token: DelegationToken): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index ad4b66e20a3..f4c51a51b88 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import kafka.server._
import kafka.utils._
import org.apache.kafka.clients.admin.Admin
@@ -36,12 +37,12 @@ import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthori
import org.apache.kafka.common.resource._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{ValueSource, CsvSource}
import scala.jdk.CollectionConverters._
@@ -65,6 +66,7 @@ import scala.jdk.CollectionConverters._
*/
@Timeout(60)
abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
+
override val brokerCount = 3
val numRecords = 1
@@ -138,24 +140,35 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1500")
+ val unimplementedquorum = ""
+
/**
* Starts MiniKDC and only then sets up the parent trait.
*/
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
- // The next two configuration parameters enable ZooKeeper secure ACLs
- // and sets the Kafka authorizer, both necessary to enable security.
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
- this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
+ if (TestInfoUtils.isKRaft(testInfo)) {
+ this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
+ this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + "User:ANONYMOUS")
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+ this.controllerConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+ } else {
+ // The next two configuration parameters enable ZooKeeper secure ACLs
+ // and sets the Kafka authorizer, both necessary to enable security.
+ this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
- // Set the specific principal that can update ACLs.
- this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
+ // Set the specific principal that can update ACLs.
+ this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
+ }
super.setUp(testInfo)
// create the test topic with all the brokers as replicas
- createTopic(topic, 1, 3)
+ val superuserAdminClient = createSuperuserAdminClient()
+ TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers,
+ numPartitions = 1, replicationFactor = 3, topicConfig = new Properties)
}
/**
@@ -170,23 +183,28 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
/**
* Tests the ability of producing and consuming with the appropriate ACLs set.
*/
- @Test
- def testProduceConsumeViaAssign(): Unit = {
- setAclsAndProduce(tp)
- val consumer = createConsumer()
- consumer.assign(List(tp).asJava)
- consumeRecords(consumer, numRecords)
- confirmReauthenticationMetrics()
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testProduceConsumeViaAssign(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
+ setAclsAndProduce(tp)
+ val consumer = createConsumer()
+ consumer.assign(List(tp).asJava)
+ consumeRecords(consumer, numRecords)
+ confirmReauthenticationMetrics()
+ }
}
protected def confirmReauthenticationMetrics(): Unit = {
val expiredConnectionsKilledCountTotal = getGauge("ExpiredConnectionsKilledCount").value()
- servers.foreach { s =>
+ brokers.foreach { s =>
val numExpiredKilled = TestUtils.totalMetricValue(s, "expired-connections-killed-count")
assertEquals(0, numExpiredKilled, "Should have been zero expired connections killed: " + numExpiredKilled + "(total=" + expiredConnectionsKilledCountTotal + ")")
}
assertEquals(0, expiredConnectionsKilledCountTotal, 0.0, "Should have been zero expired connections killed total")
- servers.foreach { s =>
+ brokers.foreach { s =>
assertEquals(0, TestUtils.totalMetricValue(s, "failed-reauthentication-total"), "failed re-authentications not 0")
}
}
@@ -198,17 +216,26 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
._2.asInstanceOf[Gauge[Double]]
}
- @Test
- def testProduceConsumeViaSubscribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testProduceConsumeViaSubscribe(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
setAclsAndProduce(tp)
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
confirmReauthenticationMetrics()
+ }
}
- @Test
- def testProduceConsumeWithWildcardAcls(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testProduceConsumeWithWildcardAcls(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
setWildcardResourceAcls()
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@@ -216,10 +243,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
confirmReauthenticationMetrics()
+ }
}
- @Test
- def testProduceConsumeWithPrefixedAcls(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testProduceConsumeWithPrefixedAcls(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
setPrefixedResourceAcls()
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@@ -227,10 +259,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
confirmReauthenticationMetrics()
+ }
}
- @Test
- def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testProduceConsumeTopicAutoCreateTopicCreateAcl(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
// topic2 is not created on setup()
val tp2 = new TopicPartition("topic2", 0)
setAclsAndProduce(tp2)
@@ -238,6 +275,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumer.assign(List(tp2).asJava)
consumeRecords(consumer, numRecords, topic = tp2.topic)
confirmReauthenticationMetrics()
+ }
}
private def setWildcardResourceAcls(): Unit = {
@@ -245,7 +283,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values
superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
}
@@ -256,7 +294,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values
superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
}
@@ -270,7 +308,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclTopicRead(topicResource)).asJava).values
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
new ResourcePattern(TOPIC, tp.topic, LITERAL))
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
@@ -286,7 +324,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
private def setConsumerGroupAcls(): Unit = {
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
}
@@ -297,8 +335,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Also verifies that subsequent publish, consume and describe to authorized topic succeeds.
*/
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
+ @CsvSource(value = Array(
+ "kraft, true",
+ "kraft, false",
+ "zk, true",
+ "zk, false"
+ ))
+ def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
// Set consumer group acls since we are testing topic authorization
setConsumerGroupAcls()
@@ -361,15 +407,24 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues
assertEquals(1, describeResults2.get(topic).get().partitions().size())
assertEquals(1, describeResults2.get(topic2).get().partitions().size())
+ }
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testNoProduceWithDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
+ @CsvSource(value = Array(
+ "kraft, true",
+ "kraft, false",
+ "zk, true",
+ "zk, false"
+ ))
+ def testNoProduceWithDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclTopicDescribe()).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
}
@@ -385,24 +440,34 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
}
confirmReauthenticationMetrics()
+ }
}
/**
* Tests that a consumer fails to consume messages without the appropriate
* ACL set.
*/
- @Test
- def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testNoConsumeWithoutDescribeAclViaAssign(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
noConsumeWithoutDescribeAclSetup()
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
// the exception is expected when the consumer attempts to lookup offsets
assertThrows(classOf[KafkaException], () => consumeRecords(consumer))
confirmReauthenticationMetrics()
+ }
}
- @Test
- def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testNoConsumeWithoutDescribeAclViaSubscribe(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
noConsumeWithoutDescribeAclSetup()
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
@@ -417,6 +482,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// Verify that records are consumed if all topics are authorized
consumer.subscribe(List(topic).asJava)
consumeRecordsIgnoreOneAuthorizationException(consumer)
+ }
}
private def noConsumeWithoutDescribeAclSetup(): Unit = {
@@ -424,7 +490,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
@@ -435,13 +501,17 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.deleteAcls(List(AclTopicDescribe().toFilter).asJava).values
superuserAdminClient.deleteAcls(List(AclTopicWrite().toFilter).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
}
- @Test
- def testNoConsumeWithDescribeAclViaAssign(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testNoConsumeWithDescribeAclViaAssign(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
noConsumeWithDescribeAclSetup()
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
@@ -449,10 +519,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
confirmReauthenticationMetrics()
+ }
}
- @Test
- def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testNoConsumeWithDescribeAclViaSubscribe(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
noConsumeWithDescribeAclSetup()
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
@@ -460,6 +535,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
confirmReauthenticationMetrics()
+ }
}
private def noConsumeWithDescribeAclSetup(): Unit = {
@@ -467,7 +543,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
@@ -479,11 +555,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Tests that a consumer fails to consume messages without the appropriate
* ACL set.
*/
- @Test
- def testNoGroupAcl(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testNoGroupAcl(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
- servers.foreach { s =>
+ brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
}
val producer = createProducer()
@@ -494,6 +574,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val e = assertThrows(classOf[GroupAuthorizationException], () => consumeRecords(consumer))
assertEquals(group, e.groupId())
confirmReauthenticationMetrics()
+ }
}
protected final def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7ccce46665c..3b459875b60 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -47,6 +47,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val adminClientConfig = new Properties
val superuserClientConfig = new Properties
val serverConfig = new Properties
+ val controllerConfig = new Properties
private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
private val producers = mutable.Buffer[KafkaProducer[_, _]]()
@@ -67,6 +68,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.map(KafkaConfig.fromProps)
}
+ override protected def kraftControllerConfigs(): Seq[Properties] = {
+ Seq(controllerConfig)
+ }
+
protected def configureListeners(props: Seq[Properties]): Unit = {
props.foreach { config =>
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 19fea48d6f0..18e792344fc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -21,9 +21,12 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.clients.admin.AdminClientConfig
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.apache.kafka.common.errors.TopicAuthorizationException
+import kafka.utils.TestInfoUtils
// This test case uses a separate listener for client and inter-broker communication, from
// which we derive corresponding principals
@@ -86,8 +89,9 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(interBrokerListenerName))
}
- @Test
- def testListenerName(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testListenerName(quorum: String): Unit = {
// To check the client listener name, establish a session on the server by sending any request eg sendRecords
val producer = createProducer()
assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords = 1, tp))
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index cc67d01546a..2f599c5ae32 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,8 +19,11 @@ package kafka.api
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import kafka.utils.TestInfoUtils
import scala.collection.immutable.List
import scala.jdk.CollectionConverters._
@@ -55,30 +58,35 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
* the second one connects ok, but fails to consume messages due to the ACL.
*/
@Timeout(15)
- @Test
- def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
- setAclsAndProduce(tp)
- val consumer1 = createConsumer()
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testTwoConsumersWithDifferentSaslCredentials(quorum: String): Unit = {
+ if (quorum == unimplementedquorum) {
+ Console.err.println("QuorumName : " + quorum + " is not supported.")
+ } else {
+ setAclsAndProduce(tp)
+ val consumer1 = createConsumer()
- // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
- consumerConfig.remove(SaslConfigs.SASL_JAAS_CONFIG)
- consumerConfig.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
+ // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
+ consumerConfig.remove(SaslConfigs.SASL_JAAS_CONFIG)
+ consumerConfig.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
- val consumer2 = createConsumer()
- consumer1.assign(List(tp).asJava)
- consumer2.assign(List(tp).asJava)
+ val consumer2 = createConsumer()
+ consumer1.assign(List(tp).asJava)
+ consumer2.assign(List(tp).asJava)
- consumeRecords(consumer1, numRecords)
+ consumeRecords(consumer1, numRecords)
- try {
- consumeRecords(consumer2)
- fail("Expected exception as consumer2 has no access to topic or group")
- } catch {
- // Either exception is possible depending on the order that the first Metadata
- // and FindCoordinator requests are received
- case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
- case e: GroupAuthorizationException => assertEquals(group, e.groupId)
+ try {
+ consumeRecords(consumer2)
+ fail("Expected exception as consumer2 has no access to topic or group")
+ } catch {
+ // Either exception is possible depending on the order that the first Metadata
+ // and FindCoordinator requests are received
+ case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
+ case e: GroupAuthorizationException => assertEquals(group, e.groupId)
+ }
+ confirmReauthenticationMetrics()
}
- confirmReauthenticationMetrics()
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index 5311f4f6355..83cfbabb8ba 100644
--- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -20,7 +20,8 @@ import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.SslConfigs
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
+
import org.junit.jupiter.api.Assertions.assertNull
import scala.collection.immutable.List
@@ -39,6 +40,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
// client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client
// authentication or SASL authentication with SSL as the transport layer (but not both).
serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+ controllerConfig.put(KafkaConfig.SslClientAuthProp, "required")
assertNull(producerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertNull(consumerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertNull(adminClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
diff --git a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
index 4baa61d4c91..904c35f31f5 100644
--- a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
@@ -17,7 +17,7 @@
package kafka.api
import kafka.utils.JaasTestUtils
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth._
class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 4d0936dcdd8..356ae2fe339 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -38,8 +38,9 @@ import org.junit.jupiter.api.Test
object SaslPlainSslEndToEndAuthorizationTest {
- class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
+ val controllerPrincipalName = "admin"
+ class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
val saslContext = context.asInstanceOf[SaslAuthenticationContext]
@@ -50,7 +51,7 @@ object SaslPlainSslEndToEndAuthorizationTest {
saslContext.server.getAuthorizationID match {
case KafkaPlainAdmin =>
- new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
case KafkaPlainUser =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
case _ =>
@@ -123,7 +124,7 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override protected def kafkaServerSaslMechanisms = List("PLAIN")
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
- override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
+ override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
override def jaasSections(kafkaServerSaslMechanisms: Seq[String],
kafkaClientSaslMechanism: Option[String],
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 6e334d16d08..b44704350bf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
import java.util.Properties
-import kafka.utils.JaasTestUtils
+import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
@@ -33,6 +33,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+ override val unimplementedquorum = "kraft"
override def configureSecurityBeforeServersStart(): Unit = {
super.configureSecurityBeforeServersStart()
@@ -53,9 +54,11 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- // Create client credentials after starting brokers so that dynamic credential creation is also tested
- createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
- createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
+ if (!TestInfoUtils.isKRaft(testInfo)) {
+ super.setUp(testInfo)
+ // Create client credentials after starting brokers so that dynamic credential creation is also tested
+ createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+ createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
+ }
}
}