You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/07/29 20:36:49 UTC
[kafka] branch trunk updated: MINOR: convert some more junit tests to support KRaft (#12456)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e26772ef61 MINOR: convert some more junit tests to support KRaft (#12456)
e26772ef61 is described below
commit e26772ef616d1095efb7e48baa44842df8aeb058
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Jul 29 13:36:40 2022 -0700
MINOR: convert some more junit tests to support KRaft (#12456)
* MINOR: convert some more junit tests to support KRaft
Introduce TestUtils#waitUntilLeaderIsElectedOrChangedWithAdmin, a ZK-free alternative to
TestUtils#waitUntilLeaderIsElectedOrChanged.
Convert PlaintextProducerSendTest, SslProducerSendTest, TransactionsWithMaxInFlightOneTest,
AddPartitionsToTxnRequestServerTest and KafkaMetricsReporterTest to support KRaft
Reviewers: dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
.../kafka/api/BaseProducerSendTest.scala | 102 ++++++++++++---------
.../kafka/api/PlaintextProducerSendTest.scala | 51 ++++++-----
.../api/TransactionsWithMaxInFlightOneTest.scala | 24 ++---
.../kafka/server/QuorumTestHarness.scala | 40 +++++---
.../AddPartitionsToTxnRequestServerTest.scala | 16 ++--
.../kafka/server/KafkaMetricsReporterTest.scala | 55 ++++++-----
.../test/scala/unit/kafka/utils/TestUtils.scala | 60 +++++++++++-
7 files changed, 226 insertions(+), 122 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 61870b073d..ce3cd32afd 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -19,22 +19,24 @@ package kafka.api
import java.time.Duration
import java.nio.charset.StandardCharsets
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.TimeUnit
-
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
@@ -42,16 +44,17 @@ import scala.concurrent.ExecutionException
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
- def generateConfigs = {
+ def generateConfigs: scala.collection.Seq[KafkaConfig] = {
val overridingProps = new Properties()
val numServers = 2
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
- TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+ TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
}
private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+ protected var admin: Admin = null
protected val topic = "topic"
private val numRecords = 100
@@ -59,6 +62,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
+
+ admin = TestUtils.createAdminClient(brokers, listenerName,
+ TestUtils.securityConfigs(Mode.CLIENT,
+ securityProtocol,
+ trustStoreFile,
+ "adminClient",
+ TestUtils.SslCertificateCn,
+ clientSaslProperties))
+
consumer = TestUtils.createConsumer(
bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
securityProtocol = SecurityProtocol.PLAINTEXT
@@ -70,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
consumer.close()
// Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
producers.foreach(_.close())
+ admin.close()
super.tearDown()
}
@@ -105,8 +118,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
*/
- @Test
- def testSendOffset(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendOffset(quorum: String): Unit = {
val producer = createProducer()
val partition = 0
@@ -134,7 +148,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// create topic
- createTopic(topic, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
// send a normal record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -166,8 +180,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
- @Test
- def testSendCompressedMessageWithCreateTime(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendCompressedMessageWithCreateTime(quorum: String): Unit = {
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
@@ -175,8 +190,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
- @Test
- def testSendNonCompressedMessageWithCreateTime(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendNonCompressedMessageWithCreateTime(quorum: String): Unit = {
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@@ -186,7 +202,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
timeoutMs: Long = 20000L): Unit = {
val partition = 0
try {
- createTopic(topic, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -241,7 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
else
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
- createTopic(topic, 1, 2, topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -267,13 +283,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*
* After close() returns, all messages should be sent with correct returned offset metadata
*/
- @Test
- def testClose(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClose(quorum: String): Unit = {
val producer = createProducer()
try {
// create topic
- createTopic(topic, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@@ -300,12 +317,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*
* The specified partition-id should be respected
*/
- @Test
- def testSendToPartition(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendToPartition(quorum: String): Unit = {
val producer = createProducer()
try {
- createTopic(topic, 2, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
val partition = 1
val now = System.currentTimeMillis()
@@ -345,14 +363,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* Producer will attempt to send messages to the partition specified in each record, and should
* succeed as long as the partition is included in the metadata.
*/
- @Test
- def testSendBeforeAndAfterPartitionExpansion(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = {
val producer = createProducer(maxBlockMs = 5 * 1000L)
// create topic
- createTopic(topic, 1, 2)
- val partition0 = 0
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ val partition0 = 0
var futures0 = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
}.map(_.get(30, TimeUnit.SECONDS))
@@ -369,13 +388,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8))).get())
assertEquals(classOf[TimeoutException], e.getCause.getClass)
- val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map {
- case (topicPartition, assignment) => topicPartition.partition -> assignment
- }
- adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
+ admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(2))).all().get()
+
// read metadata from a broker and verify the new topic partitions exist
- TestUtils.waitForPartitionMetadata(servers, topic, 0)
- TestUtils.waitForPartitionMetadata(servers, topic, 1)
+ TestUtils.waitForPartitionMetadata(brokers, topic, 0)
+ TestUtils.waitForPartitionMetadata(brokers, topic, 1)
// send records to the newly added partition after confirming that metadata have been updated.
val futures1 = (1 to numRecords).map { i =>
@@ -404,11 +421,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
/**
* Test that flush immediately sends all accumulated requests.
*/
- @Test
- def testFlush(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFlush(quorum: String): Unit = {
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
- createTopic(topic, 2, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
"value".getBytes(StandardCharsets.UTF_8))
for (_ <- 0 until 50) {
@@ -425,9 +443,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
/**
* Test close with zero timeout from caller thread
*/
- @Test
- def testCloseWithZeroTimeoutFromCallerThread(): Unit = {
- createTopic(topic, 2, 2)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = {
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@@ -450,9 +469,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
/**
* Test close with zero and non-zero timeout from sender thread
*/
- @Test
- def testCloseWithZeroTimeoutFromSenderThread(): Unit = {
- createTopic(topic, 1, 2)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = {
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 06ff201e0b..c25eb184b3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -19,22 +19,23 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.{ExecutionException, Future, TimeUnit}
-
import kafka.log.LogConfig
import kafka.server.Defaults
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class PlaintextProducerSendTest extends BaseProducerSendTest {
- @Test
- def testWrongSerializer(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testWrongSerializer(quorum: String): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
@@ -44,8 +45,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
assertThrows(classOf[SerializationException], () => producer.send(record))
}
- @Test
- def testBatchSizeZero(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testBatchSizeZero(quorum: String): Unit = {
val producer = createProducer(
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue,
@@ -53,8 +55,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
sendAndVerify(producer)
}
- @Test
- def testSendCompressedMessageWithLogAppendTime(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendCompressedMessageWithLogAppendTime(quorum: String): Unit = {
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
@@ -62,8 +65,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
- @Test
- def testSendNonCompressedMessageWithLogAppendTime(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendNonCompressedMessageWithLogAppendTime(quorum: String): Unit = {
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
@@ -73,8 +77,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
*
* The topic should be created upon sending the first message
*/
- @Test
- def testAutoCreateTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAutoCreateTopic(quorum: String): Unit = {
val producer = createProducer()
try {
// Send a message to auto-create the topic
@@ -82,18 +87,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
// double check that the topic is created with leader elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-
+ TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
} finally {
producer.close()
}
}
- @Test
- def testSendWithInvalidCreateTime(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendWithInvalidCreateTime(quorum: String): Unit = {
val topicProps = new Properties()
topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
- createTopic(topic, 1, 2, topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
try {
@@ -118,8 +123,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// Test that producer with max.block.ms=0 can be used to send in non-blocking mode
// where requests are failed immediately without blocking if metadata is not available
// or buffer is full.
- @Test
- def testNonBlockingProducer(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testNonBlockingProducer(quorum: String): Unit = {
def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = {
producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000)))
@@ -173,8 +179,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
verifySendSuccess(future2) // previous batch should be completed and sent now
}
- @Test
- def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendRecordBatchWithMaxRequestSizeAndHigher(quorum: String): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index eacc58e76c..5dd82b6b22 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -18,15 +18,16 @@
package kafka.api
import java.util.Properties
-
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.Seq
import scala.collection.mutable.Buffer
@@ -37,7 +38,7 @@ import scala.jdk.CollectionConverters._
* A single broker is used to verify edge cases where different requests are queued on the same connection.
*/
class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
- val numServers = 1
+ val numBrokers = 1
val topic1 = "topic1"
val topic2 = "topic2"
@@ -47,7 +48,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
+ TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach
@@ -55,8 +56,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
super.setUp(testInfo)
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString)
- createTopic(topic1, numPartitions, numServers, topicConfig)
- createTopic(topic2, numPartitions, numServers, topicConfig)
+ createTopic(topic1, numPartitions, numBrokers, topicConfig)
+ createTopic(topic2, numPartitions, numBrokers, topicConfig)
createTransactionalProducer("transactional-producer")
createReadCommittedConsumer("transactional-group")
@@ -69,10 +70,11 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
super.tearDown()
}
- @Test
- def testTransactionalProducerSingleBrokerMaxInFlightOne(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTransactionalProducerSingleBrokerMaxInFlightOne(quorum: String): Unit = {
// We want to test with one broker to verify multiple requests queued on a connection
- assertEquals(1, servers.size)
+ assertEquals(1, brokers.size)
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
@@ -124,7 +126,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
}
private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = {
- val producer = TestUtils.createTransactionalProducer(transactionalId, servers, maxInFlight = 1)
+ val producer = TestUtils.createTransactionalProducer(transactionalId, brokers, maxInFlight = 1)
transactionalProducers += producer
producer
}
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b82f86a8cb..a2393cdccb 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -47,9 +47,12 @@ import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait QuorumImplementation {
- def createBroker(config: KafkaConfig,
- time: Time,
- startup: Boolean): KafkaBroker
+ def createBroker(
+ config: KafkaConfig,
+ time: Time = Time.SYSTEM,
+ startup: Boolean = true,
+ threadNamePrefix: Option[String] = None,
+ ): KafkaBroker
def shutdown(): Unit
}
@@ -61,10 +64,13 @@ class ZooKeeperQuorumImplementation(
val adminZkClient: AdminZkClient,
val log: Logging
) extends QuorumImplementation {
- override def createBroker(config: KafkaConfig,
- time: Time,
- startup: Boolean): KafkaBroker = {
- val server = new KafkaServer(config, time, None, false)
+ override def createBroker(
+ config: KafkaConfig,
+ time: Time,
+ startup: Boolean,
+ threadNamePrefix: Option[String],
+ ): KafkaBroker = {
+ val server = new KafkaServer(config, time, threadNamePrefix, false)
if (startup) server.startup()
server
}
@@ -81,9 +87,12 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val clusterId: String,
val log: Logging) extends QuorumImplementation {
- override def createBroker(config: KafkaConfig,
- time: Time,
- startup: Boolean): KafkaBroker = {
+ override def createBroker(
+ config: KafkaConfig,
+ time: Time,
+ startup: Boolean,
+ threadNamePrefix: Option[String],
+ ): KafkaBroker = {
val broker = new BrokerServer(config = config,
metaProps = new MetaProperties(clusterId, config.nodeId),
raftManager = raftManager,
@@ -219,10 +228,13 @@ abstract class QuorumTestHarness extends Logging {
}
}
- def createBroker(config: KafkaConfig,
- time: Time = Time.SYSTEM,
- startup: Boolean = true): KafkaBroker = {
- implementation.createBroker(config, time, startup)
+ def createBroker(
+ config: KafkaConfig,
+ time: Time = Time.SYSTEM,
+ startup: Boolean = true,
+ threadNamePrefix: Option[String] = None
+ ): KafkaBroker = {
+ implementation.createBroker(config, time, startup, threadNamePrefix)
}
def shutdownZooKeeper(): Unit = asZk().shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index 0a98d2626c..74320e62b4 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -17,13 +17,16 @@
package kafka.server
-import java.util.Properties
+import kafka.utils.TestInfoUtils
+import java.util.Properties
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -37,11 +40,12 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- createTopic(topic1, numPartitions, servers.size, new Properties())
+ createTopic(topic1, numPartitions, brokers.size, new Properties())
}
- @Test
- def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(quorum: String): Unit = {
// The basic idea is that we have one unknown topic and one created topic. We should get the 'UNKNOWN_TOPIC_OR_PARTITION'
// error for the unknown topic and the 'OPERATION_NOT_ATTEMPTED' error for the known and authorized topic.
val nonExistentTopic = new TopicPartition("unknownTopic", 0)
@@ -58,7 +62,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
List(createdTopicPartition, nonExistentTopic).asJava)
.build()
- val leaderId = servers.head.config.brokerId
+ val leaderId = brokers.head.config.brokerId
val response = connectAndReceive[AddPartitionsToTxnResponse](request, brokerSocketServer(leaderId))
assertEquals(2, response.errors.size)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 7e5d791db2..1adf544819 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -17,15 +17,13 @@
package kafka.server
import java.util
-
import java.util.concurrent.atomic.AtomicReference
-
-import kafka.utils.{CoreUtils, TestUtils}
-import kafka.server.QuorumTestHarness
+import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
-import org.junit.jupiter.api.Assertions.{assertEquals}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
object KafkaMetricsReporterTest {
@@ -43,52 +41,63 @@ object KafkaMetricsReporterTest {
override def contextChange(metricsContext: MetricsContext): Unit = {
//read jmxPrefix
- MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString)
- MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString)
- MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString)
+ MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
+ MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
+ MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
+ MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
}
- override def configure(configs: util.Map[String, _]): Unit = {}
+ private def contextLabelOrNull(name: String, metricsContext: MetricsContext): String = {
+ Option(metricsContext.contextLabels().get(name)).flatMap(v => Option(v.toString())).getOrElse(null)
+ }
+ override def configure(configs: util.Map[String, _]): Unit = {}
}
object MockMetricsReporter {
val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
val BROKERID : AtomicReference[String] = new AtomicReference[String]
+ val NODEID : AtomicReference[String] = new AtomicReference[String]
val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
}
}
class KafkaMetricsReporterTest extends QuorumTestHarness {
- var server: KafkaServer = null
+ var broker: KafkaBroker = null
var config: KafkaConfig = null
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- val props = TestUtils.createBrokerConfig(1, zkConnect)
+ val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
- props.setProperty(KafkaConfig.BrokerIdProp, "-1")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
config = KafkaConfig.fromProps(props)
- server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
- server.startup()
+ broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName))
+ broker.startup()
}
- @Test
- def testMetricsContextNamespacePresent(): Unit = {
- assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID)
- assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID)
- assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX)
- assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMetricsContextNamespacePresent(quorum: String): Unit = {
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
+ if (isKRaftTest()) {
+ assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+ } else {
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+ assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+ }
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
- server.shutdown()
+ broker.shutdown()
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@AfterEach
override def tearDown(): Unit = {
- server.shutdown()
+ broker.shutdown()
CoreUtils.delete(config.logDirs)
super.tearDown()
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c49a7bdde0..a1063466c4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.{KafkaFuture, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
@@ -67,7 +67,6 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, Uuid}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.MetadataVersion
@@ -871,8 +870,59 @@ object TestUtils extends Logging {
* LeaderDuringDelete).
* @throws AssertionError if the expected condition is not true within the timeout.
*/
- def waitUntilLeaderIsElectedOrChanged(zkClient: KafkaZkClient, topic: String, partition: Int, timeoutMs: Long = 30000L,
- oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = {
+ def waitUntilLeaderIsElectedOrChanged(
+ zkClient: KafkaZkClient,
+ topic: String,
+ partition: Int,
+ timeoutMs: Long = 30000L,
+ oldLeaderOpt: Option[Int] = None,
+ newLeaderOpt: Option[Int] = None
+ ): Int = {
+ def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+ zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+ }
+ doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+ }
+
+ /**
+ * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
+ * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
+ * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
+ *
+ * @return The new leader (note that negative values are used to indicate conditions like NoLeader and
+ * LeaderDuringDelete).
+ * @throws AssertionError if the expected condition is not true within the timeout.
+ */
+ def waitUntilLeaderIsElectedOrChangedWithAdmin(
+ admin: Admin,
+ topic: String,
+ partition: Int,
+ timeoutMs: Long = 30000L,
+ oldLeaderOpt: Option[Int] = None,
+ newLeaderOpt: Option[Int] = None
+ ): Int = {
+ def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+ admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala.
+ find(_.partition() == partition).
+ flatMap { p =>
+ if (p.leader().id() == Node.noNode().id()) {
+ None
+ } else {
+ Some(p.leader().id())
+ }
+ }
+ }
+ doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+ }
+
+ private def doWaitUntilLeaderIsElectedOrChanged(
+ getPartitionLeader: (String, Int) => Option[Int],
+ topic: String,
+ partition: Int,
+ timeoutMs: Long,
+ oldLeaderOpt: Option[Int],
+ newLeaderOpt: Option[Int]
+ ): Int = {
require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
val startTime = System.currentTimeMillis()
val topicPartition = new TopicPartition(topic, partition)
@@ -884,7 +934,7 @@ object TestUtils extends Logging {
var electedOrChangedLeader: Option[Int] = None
while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {
// check if leader is elected
- leader = zkClient.getLeaderForPartition(topicPartition)
+ leader = getPartitionLeader(topic, partition)
leader match {
case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
case (Some(newLeader), _) if newLeader == l =>