You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/06/01 23:56:21 UTC

[kafka] branch trunk updated: MINOR: Enable kraft support in quota integration tests (#12217)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f9f7e6c78 MINOR: Enable kraft support in quota integration tests (#12217)
0f9f7e6c78 is described below

commit 0f9f7e6c7863b87f20d6b133535c4f3b181b55bf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jun 1 16:56:10 2022 -0700

    MINOR: Enable kraft support in quota integration tests (#12217)
    
    Enable kraft support in BaseQuotaTest and its extensions.
    
    Reviewers: Kvicii <42...@users.noreply.github.com>, dengziming <de...@gmail.com>
---
 .../integration/kafka/api/BaseQuotaTest.scala      | 50 ++++++++++++----------
 .../integration/kafka/api/ClientIdQuotaTest.scala  |  4 +-
 .../kafka/api/UserClientIdQuotaTest.scala          |  2 +-
 .../integration/kafka/api/UserQuotaTest.scala      |  8 ++--
 .../kafka/integration/KafkaServerTestHarness.scala |  5 ++-
 5 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index f8eedf172e..40d4cef7f8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -17,11 +17,10 @@ package kafka.api
 import java.time.Duration
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, HashMap, Properties}
-
 import com.yammer.metrics.core.{Histogram, Meter}
 import kafka.api.QuotaTestClients._
-import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, KafkaConfig, KafkaServer, QuotaType}
-import kafka.utils.TestUtils
+import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, KafkaBroker, KafkaConfig, QuotaType}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
@@ -35,7 +34,9 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
@@ -46,7 +47,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   protected def producerClientId = "QuotasTestProducer-1"
   protected def consumerClientId = "QuotasTestConsumer-1"
-  protected def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients
+  protected def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients
 
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
@@ -70,8 +71,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   val defaultRequestQuota: Double = Long.MaxValue.toDouble
 
   val topic1 = "topic-1"
-  var leaderNode: KafkaServer = _
-  var followerNode: KafkaServer = _
+  var leaderNode: KafkaBroker = _
+  var followerNode: KafkaBroker = _
   var quotaTestClients: QuotaTestClients = _
 
   @BeforeEach
@@ -79,14 +80,15 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     super.setUp(testInfo)
 
     val numPartitions = 1
-    val leaders = createTopic(topic1, numPartitions, brokerCount)
-    leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
-    followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
+    val leaders = createTopic(topic1, numPartitions, brokerCount, adminClientConfig = adminClientConfig)
+    leaderNode = if (leaders(0) == brokers.head.config.brokerId) brokers.head else brokers(1)
+    followerNode = if (leaders(0) != brokers.head.config.brokerId) brokers.head else brokers(1)
     quotaTestClients = createQuotaTestClients(topic1, leaderNode)
   }
 
-  @Test
-  def testThrottledProducerConsumer(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testThrottledProducerConsumer(quorum: String): Unit = {
     val numRecords = 1000
     val produced = quotaTestClients.produceUntilThrottled(numRecords)
     quotaTestClients.verifyProduceThrottle(expectThrottle = true)
@@ -96,8 +98,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
-  @Test
-  def testProducerConsumerOverrideUnthrottled(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProducerConsumerOverrideUnthrottled(quorum: String): Unit = {
     // Give effectively unlimited quota for producer and consumer
     val props = new Properties()
     props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Long.MaxValue.toString)
@@ -115,8 +118,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
   }
 
-  @Test
-  def testProducerConsumerOverrideLowerQuota(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProducerConsumerOverrideLowerQuota(quorum: String): Unit = {
     // consumer quota is set such that consumer quota * default quota window (10 seconds) is less than
     // MAX_PARTITION_FETCH_BYTES_CONFIG, so that we can test consumer ability to fetch in this case
     // In this case, 250 * 10 < 4096
@@ -132,8 +136,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
-  @Test
-  def testQuotaOverrideDelete(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testQuotaOverrideDelete(quorum: String): Unit = {
     // Override producer and consumer quotas to unlimited
     quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
     quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
@@ -158,8 +163,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
-  @Test
-  def testThrottledRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testThrottledRequest(quorum: String): Unit = {
     quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
     quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
 
@@ -190,7 +196,7 @@ object QuotaTestClients {
 }
 
 abstract class QuotaTestClients(topic: String,
-                                leaderNode: KafkaServer,
+                                leaderNode: KafkaBroker,
                                 producerClientId: String,
                                 consumerClientId: String,
                                 val producer: KafkaProducer[Array[Byte], Array[Byte]],
@@ -367,7 +373,7 @@ abstract class QuotaTestClients(topic: String,
     adminClient.alterClientQuotas(quotaAlterations.asJava).all().get()
   }
 
-  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode): Unit = {
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaBroker = leaderNode): Unit = {
     TestUtils.retry(10000) {
       val quotaManagers = server.dataPlaneRequestProcessor.quotas
       val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index e4cebe2e90..d85d4b79a4 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -14,7 +14,7 @@
 
 package kafka.api
 
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
@@ -35,7 +35,7 @@ class ClientIdQuotaTest extends BaseQuotaTest {
     quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
     val producer = createProducer()
     val consumer = createConsumer()
     val adminClient = createAdminClient()
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 83c70dacf3..3620546576 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -42,7 +42,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
     val producer = createProducer()
     val consumer = createConsumer()
     val adminClient = createAdminClient()
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index ffbaebb9c9..fd1639e43b 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -14,13 +14,13 @@
 
 package kafka.api
 
-import java.io.File
-
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
+import java.io.File
+
 class UserQuotaTest extends BaseQuotaTest with SaslSetup {
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -49,7 +49,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
     closeSasl()
   }
 
-  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
     val producer = createProducer()
     val consumer = createConsumer()
     val adminClient = createAdminClient()
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 26f4c9d4c2..29aebd55cd 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -169,10 +169,11 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     numPartitions: Int = 1,
     replicationFactor: Int = 1,
     topicConfig: Properties = new Properties,
-    listenerName: ListenerName = listenerName
+    listenerName: ListenerName = listenerName,
+    adminClientConfig: Properties = new Properties
   ): scala.collection.immutable.Map[Int, Int] = {
     if (isKRaftTest()) {
-      resource(createAdminClient(brokers, listenerName)) { admin =>
+      resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
         TestUtils.createTopicWithAdmin(
           admin = admin,
           topic = topic,