You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/06/27 23:01:32 UTC
[kafka] branch trunk updated: MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d654bc1b15 MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
d654bc1b15 is described below
commit d654bc1b15740acf8f1647a0f4533f4cd7f71271
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 27 16:01:15 2022 -0700
MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
Support KRaft in `GroupAuthorizerIntegrationTest`.
Reviewers: David Arthur <mu...@gmail.com>
---
.../kafka/api/AuthorizerIntegrationTest.scala | 39 ++++++--------
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 60 +++++++++++++++-------
.../kafka/server/QuorumTestHarness.scala | 19 ++++---
.../kafka/integration/KafkaServerTestHarness.scala | 10 ++++
4 files changed, 78 insertions(+), 50 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f9999dc18c..a109ae8ce4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,9 +85,8 @@ object AuthorizerIntegrationTest {
class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
- case BrokerListenerName => BrokerPrincipal
+ case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
- case ControllerListenerName => BrokerPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
}
@@ -152,32 +151,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = {
+ properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+ addNodeProperties(properties)
+ }
+
+ override def kraftControllerConfigs(): collection.Seq[Properties] = {
+ val controllerConfigs = super.kraftControllerConfigs()
+ controllerConfigs.foreach(addNodeProperties)
+ controllerConfigs
+ }
+
+ private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
- properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
+ properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
}
- properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
- properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
- classOf[PrincipalBuilder].getName)
+ properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName)
}
- override def kraftControllerConfigs(): Seq[Properties] = {
- val controllerConfigs = Seq(new Properties())
- controllerConfigs.foreach { properties =>
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName())
- properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
- properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
- classOf[PrincipalBuilder].getName)
- }
- controllerConfigs
- }
val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
@@ -2574,14 +2573,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
- private def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
- TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
- }
-
- private def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
- TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
- }
-
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 2b380bfd2a..82e637ae00 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -14,12 +14,11 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
-
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@@ -30,8 +29,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -41,11 +43,12 @@ object GroupAuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
+ val ControllerListenerName = "CONTROLLER"
class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
- case BrokerListenerName => BrokerPrincipal
+ case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
@@ -64,9 +67,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
def clientPrincipal: KafkaPrincipal = ClientPrincipal
+ override def kraftControllerConfigs(): collection.Seq[Properties] = {
+ val controllerConfigs = super.kraftControllerConfigs()
+ controllerConfigs.foreach(addNodeProperties)
+ controllerConfigs
+ }
+
override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+ addNodeProperties(properties)
+ }
+
+ private def addNodeProperties(properties: Properties): Unit = {
+ if (isKRaftTest()) {
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+ properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
+ } else {
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
+ }
+
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
@@ -80,11 +99,12 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
// Allow inter-broker communication
- TestUtils.addAndVerifyAcls(brokers,
+ addAndVerifyAcls(
Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)),
- new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL))
+ new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+ )
- TestUtils.createOffsetsTopic(zkClient, servers)
+ createOffsetsTopic(interBrokerListenerName)
}
private def createAcl(aclOperation: AclOperation,
@@ -93,12 +113,13 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
new AccessControlEntry(principal.toString, WildcardHost, aclOperation, aclPermissionType)
}
- @Test
- def testUnauthorizedProduceAndConsume(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUnauthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
- createTopic(topic)
+ createTopic(topic, listenerName = interBrokerListenerName)
val producer = createProducer()
val produceException = assertThrows(classOf[ExecutionException],
@@ -113,22 +134,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
- @Test
- def testAuthorizedProduceAndConsume(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAuthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
- createTopic(topic)
+ createTopic(topic, listenerName = interBrokerListenerName)
- TestUtils.addAndVerifyAcls(brokers,
+ addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+ )
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
- TestUtils.addAndVerifyAcls(brokers,
+ addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+ )
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(List(topicPartition).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 920b24441e..b82f86a8cb 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -43,6 +43,7 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait QuorumImplementation {
@@ -123,9 +124,12 @@ abstract class QuorumTestHarness extends Logging {
val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
+ private var testInfo: TestInfo = null
private var implementation: QuorumImplementation = null
- def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
+ def isKRaftTest(): Boolean = {
+ TestInfoUtils.isKRaft(testInfo)
+ }
def checkIsZKTest(): Unit = {
if (isKRaftTest()) {
@@ -182,6 +186,7 @@ abstract class QuorumTestHarness extends Logging {
// That way you control the initialization order.
@BeforeEach
def setUp(testInfo: TestInfo): Unit = {
+ this.testInfo = testInfo
Exit.setExitProcedure((code, message) => {
try {
throw new RuntimeException(s"exit(${code}, ${message}) called!")
@@ -202,16 +207,14 @@ abstract class QuorumTestHarness extends Logging {
tearDown()
}
})
- val name = if (testInfo.getTestMethod().isPresent()) {
- testInfo.getTestMethod().get().toString()
- } else {
- "[unspecified]"
- }
+ val name = testInfo.getTestMethod.asScala
+ .map(_.toString)
+ .getOrElse("[unspecified]")
if (TestInfoUtils.isKRaft(testInfo)) {
- info(s"Running KRAFT test ${name}")
+ info(s"Running KRAFT test $name")
implementation = newKRaftQuorum(testInfo)
} else {
- info(s"Running ZK test ${name}")
+ info(s"Running ZK test $name")
implementation = newZooKeeperQuorum()
}
}
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index af1762e319..4322650780 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -30,8 +30,10 @@ import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import java.util.Properties
import kafka.utils.TestUtils.{createAdminClient, resource}
+import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Time
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
@@ -237,6 +239,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
+ def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
+ TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
+ }
+
+ def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
+ TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
+ }
+
/**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed