You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/06/01 23:56:21 UTC
[kafka] branch trunk updated: MINOR: Enable kraft support in quota integration tests (#12217)
This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f9f7e6c78 MINOR: Enable kraft support in quota integration tests (#12217)
0f9f7e6c78 is described below
commit 0f9f7e6c7863b87f20d6b133535c4f3b181b55bf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jun 1 16:56:10 2022 -0700
MINOR: Enable kraft support in quota integration tests (#12217)
Enable kraft support in BaseQuotaTest and its extensions.
Reviewers: Kvicii <42...@users.noreply.github.com>, dengziming <de...@gmail.com>
---
.../integration/kafka/api/BaseQuotaTest.scala | 50 ++++++++++++----------
.../integration/kafka/api/ClientIdQuotaTest.scala | 4 +-
.../kafka/api/UserClientIdQuotaTest.scala | 2 +-
.../integration/kafka/api/UserQuotaTest.scala | 8 ++--
.../kafka/integration/KafkaServerTestHarness.scala | 5 ++-
5 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index f8eedf172e..40d4cef7f8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -17,11 +17,10 @@ package kafka.api
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.{Collections, HashMap, Properties}
-
import com.yammer.metrics.core.{Histogram, Meter}
import kafka.api.QuotaTestClients._
-import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, KafkaConfig, KafkaServer, QuotaType}
-import kafka.utils.TestUtils
+import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, KafkaBroker, KafkaConfig, QuotaType}
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
@@ -35,7 +34,9 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -46,7 +47,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
protected def producerClientId = "QuotasTestProducer-1"
protected def consumerClientId = "QuotasTestConsumer-1"
- protected def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients
+ protected def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
@@ -70,8 +71,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
val defaultRequestQuota: Double = Long.MaxValue.toDouble
val topic1 = "topic-1"
- var leaderNode: KafkaServer = _
- var followerNode: KafkaServer = _
+ var leaderNode: KafkaBroker = _
+ var followerNode: KafkaBroker = _
var quotaTestClients: QuotaTestClients = _
@BeforeEach
@@ -79,14 +80,15 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
super.setUp(testInfo)
val numPartitions = 1
- val leaders = createTopic(topic1, numPartitions, brokerCount)
- leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
- followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
+ val leaders = createTopic(topic1, numPartitions, brokerCount, adminClientConfig = adminClientConfig)
+ leaderNode = if (leaders(0) == brokers.head.config.brokerId) brokers.head else brokers(1)
+ followerNode = if (leaders(0) != brokers.head.config.brokerId) brokers.head else brokers(1)
quotaTestClients = createQuotaTestClients(topic1, leaderNode)
}
- @Test
- def testThrottledProducerConsumer(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testThrottledProducerConsumer(quorum: String): Unit = {
val numRecords = 1000
val produced = quotaTestClients.produceUntilThrottled(numRecords)
quotaTestClients.verifyProduceThrottle(expectThrottle = true)
@@ -96,8 +98,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
}
- @Test
- def testProducerConsumerOverrideUnthrottled(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProducerConsumerOverrideUnthrottled(quorum: String): Unit = {
// Give effectively unlimited quota for producer and consumer
val props = new Properties()
props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Long.MaxValue.toString)
@@ -115,8 +118,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
}
- @Test
- def testProducerConsumerOverrideLowerQuota(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProducerConsumerOverrideLowerQuota(quorum: String): Unit = {
// consumer quota is set such that consumer quota * default quota window (10 seconds) is less than
// MAX_PARTITION_FETCH_BYTES_CONFIG, so that we can test consumer ability to fetch in this case
// In this case, 250 * 10 < 4096
@@ -132,8 +136,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
}
- @Test
- def testQuotaOverrideDelete(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testQuotaOverrideDelete(quorum: String): Unit = {
// Override producer and consumer quotas to unlimited
quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
@@ -158,8 +163,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
}
- @Test
- def testThrottledRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testThrottledRequest(quorum: String): Unit = {
quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
@@ -190,7 +196,7 @@ object QuotaTestClients {
}
abstract class QuotaTestClients(topic: String,
- leaderNode: KafkaServer,
+ leaderNode: KafkaBroker,
producerClientId: String,
consumerClientId: String,
val producer: KafkaProducer[Array[Byte], Array[Byte]],
@@ -367,7 +373,7 @@ abstract class QuotaTestClients(topic: String,
adminClient.alterClientQuotas(quotaAlterations.asJava).all().get()
}
- def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode): Unit = {
+ def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaBroker = leaderNode): Unit = {
TestUtils.retry(10000) {
val quotaManagers = server.dataPlaneRequestProcessor.quotas
val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index e4cebe2e90..d85d4b79a4 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -14,7 +14,7 @@
package kafka.api
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@@ -35,7 +35,7 @@ class ClientIdQuotaTest extends BaseQuotaTest {
quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
- override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+ override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
val producer = createProducer()
val consumer = createConsumer()
val adminClient = createAdminClient()
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 83c70dacf3..3620546576 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -42,7 +42,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
- override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+ override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
val producer = createProducer()
val consumer = createConsumer()
val adminClient = createAdminClient()
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index ffbaebb9c9..fd1639e43b 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -14,13 +14,13 @@
package kafka.api
-import java.io.File
-
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import java.io.File
+
class UserQuotaTest extends BaseQuotaTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -49,7 +49,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
closeSasl()
}
- override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+ override def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients = {
val producer = createProducer()
val consumer = createConsumer()
val adminClient = createAdminClient()
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 26f4c9d4c2..29aebd55cd 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -169,10 +169,11 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
numPartitions: Int = 1,
replicationFactor: Int = 1,
topicConfig: Properties = new Properties,
- listenerName: ListenerName = listenerName
+ listenerName: ListenerName = listenerName,
+ adminClientConfig: Properties = new Properties
): scala.collection.immutable.Map[Int, Int] = {
if (isKRaftTest()) {
- resource(createAdminClient(brokers, listenerName)) { admin =>
+ resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,