You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/06/27 23:01:32 UTC

[kafka] branch trunk updated: MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d654bc1b15 MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
d654bc1b15 is described below

commit d654bc1b15740acf8f1647a0f4533f4cd7f71271
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 27 16:01:15 2022 -0700

    MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
    
    Support KRaft in `GroupAuthorizerIntegrationTest`.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 39 ++++++--------
 .../kafka/api/GroupAuthorizerIntegrationTest.scala | 60 +++++++++++++++-------
 .../kafka/server/QuorumTestHarness.scala           | 19 ++++---
 .../kafka/integration/KafkaServerTestHarness.scala | 10 ++++
 4 files changed, 78 insertions(+), 50 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f9999dc18c..a109ae8ce4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,9 +85,8 @@ object AuthorizerIntegrationTest {
   class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context.listenerName match {
-        case BrokerListenerName => BrokerPrincipal
+        case BrokerListenerName | ControllerListenerName => BrokerPrincipal
         case ClientListenerName => ClientPrincipal
-        case ControllerListenerName => BrokerPrincipal
         case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
       }
     }
@@ -152,32 +151,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+    addNodeProperties(properties)
+  }
+
+  override def kraftControllerConfigs(): collection.Seq[Properties] = {
+    val controllerConfigs = super.kraftControllerConfigs()
+    controllerConfigs.foreach(addNodeProperties)
+    controllerConfigs
+  }
+
+  private def addNodeProperties(properties: Properties): Unit = {
     if (isKRaftTest()) {
       properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
-      properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
+      properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
     } else {
       properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
     }
-    properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-    properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
-      classOf[PrincipalBuilder].getName)
+    properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName)
   }
 
-  override def kraftControllerConfigs(): Seq[Properties] = {
-    val controllerConfigs = Seq(new Properties())
-    controllerConfigs.foreach { properties =>
-      properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName())
-      properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
-      properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
-        classOf[PrincipalBuilder].getName)
-    }
-    controllerConfigs
-  }
 
   val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
@@ -2574,14 +2573,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
-  private def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
-    TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
-  }
-
-  private def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
-    TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
-  }
-
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 2b380bfd2a..82e637ae00 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -14,12 +14,11 @@ package kafka.api
 
 import java.util.Properties
 import java.util.concurrent.ExecutionException
-
 import kafka.api.GroupAuthorizerIntegrationTest._
 import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
 import kafka.server.{BaseRequestTest, KafkaConfig}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
@@ -30,8 +29,11 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -41,11 +43,12 @@ object GroupAuthorizerIntegrationTest {
 
   val BrokerListenerName = "BROKER"
   val ClientListenerName = "CLIENT"
+  val ControllerListenerName = "CONTROLLER"
 
   class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context.listenerName match {
-        case BrokerListenerName => BrokerPrincipal
+        case BrokerListenerName | ControllerListenerName => BrokerPrincipal
         case ClientListenerName => ClientPrincipal
         case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
       }
@@ -64,9 +67,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
   def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
   def clientPrincipal: KafkaPrincipal = ClientPrincipal
 
+  override def kraftControllerConfigs(): collection.Seq[Properties] = {
+    val controllerConfigs = super.kraftControllerConfigs()
+    controllerConfigs.foreach(addNodeProperties)
+    controllerConfigs
+  }
+
   override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+    addNodeProperties(properties)
+  }
+
+  private def addNodeProperties(properties: Properties): Unit = {
+    if (isKRaftTest()) {
+      properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+      properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
+    } else {
+      properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
+    }
+
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
@@ -80,11 +99,12 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
     doSetup(testInfo, createOffsetsTopic = false)
 
     // Allow inter-broker communication
-    TestUtils.addAndVerifyAcls(brokers,
+    addAndVerifyAcls(
       Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)),
-      new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL))
+      new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+    )
 
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    createOffsetsTopic(interBrokerListenerName)
   }
 
   private def createAcl(aclOperation: AclOperation,
@@ -93,12 +113,13 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
     new AccessControlEntry(principal.toString, WildcardHost, aclOperation, aclPermissionType)
   }
 
-  @Test
-  def testUnauthorizedProduceAndConsume(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnauthorizedProduceAndConsume(quorum: String): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition("topic", 0)
 
-    createTopic(topic)
+    createTopic(topic, listenerName = interBrokerListenerName)
 
     val producer = createProducer()
     val produceException = assertThrows(classOf[ExecutionException],
@@ -113,22 +134,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
-  @Test
-  def testAuthorizedProduceAndConsume(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAuthorizedProduceAndConsume(quorum: String): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition("topic", 0)
 
-    createTopic(topic)
+    createTopic(topic, listenerName = interBrokerListenerName)
 
-    TestUtils.addAndVerifyAcls(brokers,
+    addAndVerifyAcls(
       Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
     val producer = createProducer()
     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
 
-    TestUtils.addAndVerifyAcls(brokers,
+    addAndVerifyAcls(
       Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
     val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
     consumer.assign(List(topicPartition).asJava)
     TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 920b24441e..b82f86a8cb 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -43,6 +43,7 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
 
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable}
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait QuorumImplementation {
@@ -123,9 +124,12 @@ abstract class QuorumTestHarness extends Logging {
 
   val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
 
+  private var testInfo: TestInfo = null
   private var implementation: QuorumImplementation = null
 
-  def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
+  def isKRaftTest(): Boolean = {
+    TestInfoUtils.isKRaft(testInfo)
+  }
 
   def checkIsZKTest(): Unit = {
     if (isKRaftTest()) {
@@ -182,6 +186,7 @@ abstract class QuorumTestHarness extends Logging {
   // That way you control the initialization order.
   @BeforeEach
   def setUp(testInfo: TestInfo): Unit = {
+    this.testInfo = testInfo
     Exit.setExitProcedure((code, message) => {
       try {
         throw new RuntimeException(s"exit(${code}, ${message}) called!")
@@ -202,16 +207,14 @@ abstract class QuorumTestHarness extends Logging {
         tearDown()
       }
     })
-    val name = if (testInfo.getTestMethod().isPresent()) {
-      testInfo.getTestMethod().get().toString()
-    } else {
-      "[unspecified]"
-    }
+    val name = testInfo.getTestMethod.asScala
+      .map(_.toString)
+      .getOrElse("[unspecified]")
     if (TestInfoUtils.isKRaft(testInfo)) {
-      info(s"Running KRAFT test ${name}")
+      info(s"Running KRAFT test $name")
       implementation = newKRaftQuorum(testInfo)
     } else {
-      info(s"Running ZK test ${name}")
+      info(s"Running ZK test $name")
       implementation = newZooKeeperQuorum()
     }
   }
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index af1762e319..4322650780 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -30,8 +30,10 @@ import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 import java.util.Properties
 import kafka.utils.TestUtils.{createAdminClient, resource}
+import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
@@ -237,6 +239,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     }
   }
 
+  def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
+    TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
+  }
+
+  def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
+    TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
+  }
+
   /**
    * Pick a broker at random and kill it if it isn't already dead
    * Return the id of the broker killed