You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/07/29 20:36:49 UTC

[kafka] branch trunk updated: MINOR: convert some more junit tests to support KRaft (#12456)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e26772ef61 MINOR: convert some more junit tests to support KRaft (#12456)
e26772ef61 is described below

commit e26772ef616d1095efb7e48baa44842df8aeb058
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Jul 29 13:36:40 2022 -0700

    MINOR: convert some more junit tests to support KRaft (#12456)
    
    * MINOR: convert some more junit tests to support KRaft
    
    Introduce TestUtils#waitUntilLeaderIsElectedOrChangedWithAdmin, a ZK-free alternative to
    TestUtils#waitUntilLeaderIsElectedOrChanged.
    
    Convert PlaintextProducerSendTest, SslProducerSendTest, TransactionsWithMaxInFlightOneTest,
    AddPartitionsToTxnRequestServerTest and KafkaMetricsReporterTest to support KRaft
    
    Reviewers: dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../kafka/api/BaseProducerSendTest.scala           | 102 ++++++++++++---------
 .../kafka/api/PlaintextProducerSendTest.scala      |  51 ++++++-----
 .../api/TransactionsWithMaxInFlightOneTest.scala   |  24 ++---
 .../kafka/server/QuorumTestHarness.scala           |  40 +++++---
 .../AddPartitionsToTxnRequestServerTest.scala      |  16 ++--
 .../kafka/server/KafkaMetricsReporterTest.scala    |  55 ++++++-----
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  60 +++++++++++-
 7 files changed, 226 insertions(+), 122 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 61870b073d..ce3cd32afd 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -19,22 +19,24 @@ package kafka.api
 
 import java.time.Duration
 import java.nio.charset.StandardCharsets
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.TimeUnit
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, NewPartitions}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
@@ -42,16 +44,17 @@ import scala.concurrent.ExecutionException
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
-  def generateConfigs = {
+  def generateConfigs: scala.collection.Seq[KafkaConfig] = {
     val overridingProps = new Properties()
     val numServers = 2
     overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
-    TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+    TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
   }
 
   private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
   private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  protected var admin: Admin = null
 
   protected val topic = "topic"
   private val numRecords = 100
@@ -59,6 +62,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
+
+    admin = TestUtils.createAdminClient(brokers, listenerName,
+        TestUtils.securityConfigs(Mode.CLIENT,
+          securityProtocol,
+          trustStoreFile,
+          "adminClient",
+          TestUtils.SslCertificateCn,
+          clientSaslProperties))
+
     consumer = TestUtils.createConsumer(
       bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
       securityProtocol = SecurityProtocol.PLAINTEXT
@@ -70,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     consumer.close()
     // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
     producers.foreach(_.close())
+    admin.close()
 
     super.tearDown()
   }
@@ -105,8 +118,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
    * 2. Last message of the non-blocking send should return the correct offset metadata
    */
-  @Test
-  def testSendOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendOffset(quorum: String): Unit = {
     val producer = createProducer()
     val partition = 0
 
@@ -134,7 +148,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -166,8 +180,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testSendCompressedMessageWithCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendCompressedMessageWithCreateTime(quorum: String): Unit = {
     val producer = createProducer(
       compressionType = "gzip",
       lingerMs = Int.MaxValue,
@@ -175,8 +190,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
-  @Test
-  def testSendNonCompressedMessageWithCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendNonCompressedMessageWithCreateTime(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
@@ -186,7 +202,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
                               timeoutMs: Long = 20000L): Unit = {
     val partition = 0
     try {
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       val futures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -241,7 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
       else
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
-      createTopic(topic, 1, 2, topicProps)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -267,13 +283,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    *
    * After close() returns, all messages should be sent with correct returned offset metadata
    */
-  @Test
-  def testClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClose(quorum: String): Unit = {
     val producer = createProducer()
 
     try {
       // create topic
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@@ -300,12 +317,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    *
    * The specified partition-id should be respected
    */
-  @Test
-  def testSendToPartition(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendToPartition(quorum: String): Unit = {
     val producer = createProducer()
 
     try {
-      createTopic(topic, 2, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
       val partition = 1
 
       val now = System.currentTimeMillis()
@@ -345,14 +363,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     * Producer will attempt to send messages to the partition specified in each record, and should
     * succeed as long as the partition is included in the metadata.
     */
-  @Test
-  def testSendBeforeAndAfterPartitionExpansion(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = {
     val producer = createProducer(maxBlockMs = 5 * 1000L)
 
     // create topic
-    createTopic(topic, 1, 2)
-    val partition0 = 0
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
+    val partition0 = 0
     var futures0 = (1 to numRecords).map { i =>
       producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
     }.map(_.get(30, TimeUnit.SECONDS))
@@ -369,13 +388,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8))).get())
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
 
-    val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map {
-      case (topicPartition, assignment) => topicPartition.partition -> assignment
-    }
-    adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
+    admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(2))).all().get()
+
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic, 0)
-    TestUtils.waitForPartitionMetadata(servers, topic, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic, 0)
+    TestUtils.waitForPartitionMetadata(brokers, topic, 1)
 
     // send records to the newly added partition after confirming that metadata have been updated.
     val futures1 = (1 to numRecords).map { i =>
@@ -404,11 +421,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test that flush immediately sends all accumulated requests.
    */
-  @Test
-  def testFlush(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFlush(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     try {
-      createTopic(topic, 2, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
         "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 0 until 50) {
@@ -425,9 +443,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test close with zero timeout from caller thread
    */
-  @Test
-  def testCloseWithZeroTimeoutFromCallerThread(): Unit = {
-    createTopic(topic, 2, 2)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = {
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@@ -450,9 +469,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test close with zero and non-zero timeout from sender thread
    */
-  @Test
-  def testCloseWithZeroTimeoutFromSenderThread(): Unit = {
-    createTopic(topic, 1, 2)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = {
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 06ff201e0b..c25eb184b3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -19,22 +19,23 @@ package kafka.api
 
 import java.util.Properties
 import java.util.concurrent.{ExecutionException, Future, TimeUnit}
-
 import kafka.log.LogConfig
 import kafka.server.Defaults
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 
 class PlaintextProducerSendTest extends BaseProducerSendTest {
 
-  @Test
-  def testWrongSerializer(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongSerializer(quorum: String): Unit = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
@@ -44,8 +45,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     assertThrows(classOf[SerializationException], () => producer.send(record))
   }
 
-  @Test
-  def testBatchSizeZero(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBatchSizeZero(quorum: String): Unit = {
     val producer = createProducer(
       lingerMs = Int.MaxValue,
       deliveryTimeoutMs = Int.MaxValue,
@@ -53,8 +55,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     sendAndVerify(producer)
   }
 
-  @Test
-  def testSendCompressedMessageWithLogAppendTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendCompressedMessageWithLogAppendTime(quorum: String): Unit = {
     val producer = createProducer(
       compressionType = "gzip",
       lingerMs = Int.MaxValue,
@@ -62,8 +65,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
-  @Test
-  def testSendNonCompressedMessageWithLogAppendTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendNonCompressedMessageWithLogAppendTime(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
@@ -73,8 +77,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
    *
    * The topic should be created upon sending the first message
    */
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopic(quorum: String): Unit = {
     val producer = createProducer()
     try {
       // Send a message to auto-create the topic
@@ -82,18 +87,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
 
       // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-
+      TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
     } finally {
       producer.close()
     }
   }
 
-  @Test
-  def testSendWithInvalidCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithInvalidCreateTime(quorum: String): Unit = {
     val topicProps = new Properties()
     topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
-    createTopic(topic, 1, 2, topicProps)
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
 
     val producer = createProducer()
     try {
@@ -118,8 +123,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   // Test that producer with max.block.ms=0 can be used to send in non-blocking mode
   // where requests are failed immediately without blocking if metadata is not available
   // or buffer is full.
-  @Test
-  def testNonBlockingProducer(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNonBlockingProducer(quorum: String): Unit = {
 
     def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = {
       producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000)))
@@ -173,8 +179,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     verifySendSuccess(future2)               // previous batch should be completed and sent now
   }
 
-  @Test
-  def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendRecordBatchWithMaxRequestSizeAndHigher(quorum: String): Unit = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index eacc58e76c..5dd82b6b22 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -18,15 +18,16 @@
 package kafka.api
 
 import java.util.Properties
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.Seq
 import scala.collection.mutable.Buffer
@@ -37,7 +38,7 @@ import scala.jdk.CollectionConverters._
  * A single broker is used to verify edge cases where different requests are queued on the same connection.
  */
 class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
-  val numServers = 1
+  val numBrokers = 1
 
   val topic1 = "topic1"
   val topic2 = "topic2"
@@ -47,7 +48,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
   val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
   }
 
   @BeforeEach
@@ -55,8 +56,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
     super.setUp(testInfo)
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString)
-    createTopic(topic1, numPartitions, numServers, topicConfig)
-    createTopic(topic2, numPartitions, numServers, topicConfig)
+    createTopic(topic1, numPartitions, numBrokers, topicConfig)
+    createTopic(topic2, numPartitions, numBrokers, topicConfig)
 
     createTransactionalProducer("transactional-producer")
     createReadCommittedConsumer("transactional-group")
@@ -69,10 +70,11 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
-  @Test
-  def testTransactionalProducerSingleBrokerMaxInFlightOne(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionalProducerSingleBrokerMaxInFlightOne(quorum: String): Unit = {
     // We want to test with one broker to verify multiple requests queued on a connection
-    assertEquals(1, servers.size)
+    assertEquals(1, brokers.size)
 
     val producer = transactionalProducers.head
     val consumer = transactionalConsumers.head
@@ -124,7 +126,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
   }
 
   private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = {
-    val producer = TestUtils.createTransactionalProducer(transactionalId, servers, maxInFlight = 1)
+    val producer = TestUtils.createTransactionalProducer(transactionalId, brokers, maxInFlight = 1)
     transactionalProducers += producer
     producer
   }
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b82f86a8cb..a2393cdccb 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -47,9 +47,12 @@ import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait QuorumImplementation {
-  def createBroker(config: KafkaConfig,
-                   time: Time,
-                   startup: Boolean): KafkaBroker
+  def createBroker(
+    config: KafkaConfig,
+    time: Time = Time.SYSTEM,
+    startup: Boolean = true,
+    threadNamePrefix: Option[String] = None,
+  ): KafkaBroker
 
   def shutdown(): Unit
 }
@@ -61,10 +64,13 @@ class ZooKeeperQuorumImplementation(
   val adminZkClient: AdminZkClient,
   val log: Logging
 ) extends QuorumImplementation {
-  override def createBroker(config: KafkaConfig,
-                            time: Time,
-                            startup: Boolean): KafkaBroker = {
-    val server = new KafkaServer(config, time, None, false)
+  override def createBroker(
+    config: KafkaConfig,
+    time: Time,
+    startup: Boolean,
+    threadNamePrefix: Option[String],
+  ): KafkaBroker = {
+    val server = new KafkaServer(config, time, threadNamePrefix, false)
     if (startup) server.startup()
     server
   }
@@ -81,9 +87,12 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
                                 val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
                                 val clusterId: String,
                                 val log: Logging) extends QuorumImplementation {
-  override def createBroker(config: KafkaConfig,
-                            time: Time,
-                            startup: Boolean): KafkaBroker = {
+  override def createBroker(
+    config: KafkaConfig,
+    time: Time,
+    startup: Boolean,
+    threadNamePrefix: Option[String],
+  ): KafkaBroker = {
     val broker = new BrokerServer(config = config,
       metaProps = new MetaProperties(clusterId, config.nodeId),
       raftManager = raftManager,
@@ -219,10 +228,13 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
-  def createBroker(config: KafkaConfig,
-                   time: Time = Time.SYSTEM,
-                   startup: Boolean = true): KafkaBroker = {
-    implementation.createBroker(config, time, startup)
+  def createBroker(
+    config: KafkaConfig,
+    time: Time = Time.SYSTEM,
+    startup: Boolean = true,
+    threadNamePrefix: Option[String] = None
+  ): KafkaBroker = {
+    implementation.createBroker(config, time, startup, threadNamePrefix)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index 0a98d2626c..74320e62b4 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -17,13 +17,16 @@
 
 package kafka.server
 
-import java.util.Properties
+import kafka.utils.TestInfoUtils
 
+import java.util.Properties
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -37,11 +40,12 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    createTopic(topic1, numPartitions, servers.size, new Properties())
+    createTopic(topic1, numPartitions, brokers.size, new Properties())
   }
 
-  @Test
-  def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(quorum: String): Unit = {
     // The basic idea is that we have one unknown topic and one created topic. We should get the 'UNKNOWN_TOPIC_OR_PARTITION'
     // error for the unknown topic and the 'OPERATION_NOT_ATTEMPTED' error for the known and authorized topic.
     val nonExistentTopic = new TopicPartition("unknownTopic", 0)
@@ -58,7 +62,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
       List(createdTopicPartition, nonExistentTopic).asJava)
       .build()
 
-    val leaderId = servers.head.config.brokerId
+    val leaderId = brokers.head.config.brokerId
     val response = connectAndReceive[AddPartitionsToTxnResponse](request, brokerSocketServer(leaderId))
 
     assertEquals(2, response.errors.size)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 7e5d791db2..1adf544819 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -17,15 +17,13 @@
 package kafka.server
 
 import java.util
-
 import java.util.concurrent.atomic.AtomicReference
-
-import kafka.utils.{CoreUtils, TestUtils}
-import kafka.server.QuorumTestHarness
+import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
-import org.junit.jupiter.api.Assertions.{assertEquals}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 
 object KafkaMetricsReporterTest {
@@ -43,52 +41,63 @@ object KafkaMetricsReporterTest {
     override def contextChange(metricsContext: MetricsContext): Unit = {
       //read jmxPrefix
 
-      MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString)
-      MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString)
-      MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString)
+      MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
+      MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
+      MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
+      MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
     }
 
-    override def configure(configs: util.Map[String, _]): Unit = {}
+    private def contextLabelOrNull(name: String, metricsContext: MetricsContext): String = {
+      Option(metricsContext.contextLabels().get(name)).flatMap(v => Option(v.toString())).getOrElse(null)
+    }
 
+    override def configure(configs: util.Map[String, _]): Unit = {}
   }
 
   object MockMetricsReporter {
     val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
     val BROKERID : AtomicReference[String] = new AtomicReference[String]
+    val NODEID : AtomicReference[String] = new AtomicReference[String]
     val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
   }
 }
 
 class KafkaMetricsReporterTest extends QuorumTestHarness {
-  var server: KafkaServer = null
+  var broker: KafkaBroker = null
   var config: KafkaConfig = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
     props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
-    props.setProperty(KafkaConfig.BrokerIdProp, "-1")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
     config = KafkaConfig.fromProps(props)
-    server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
-    server.startup()
+    broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName))
+    broker.startup()
   }
 
-  @Test
-  def testMetricsContextNamespacePresent(): Unit = {
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID)
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID)
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX)
-    assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMetricsContextNamespacePresent(quorum: String): Unit = {
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
+    if (isKRaftTest()) {
+      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+      assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+    } else {
+      assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+    }
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
 
-    server.shutdown()
+    broker.shutdown()
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    server.shutdown()
+    broker.shutdown()
     CoreUtils.delete(config.logDirs)
     super.tearDown()
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c49a7bdde0..a1063466c4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.{KafkaFuture, Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
@@ -67,7 +67,6 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, Uuid}
 import org.apache.kafka.controller.QuorumController
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
 import org.apache.kafka.server.common.MetadataVersion
@@ -871,8 +870,59 @@ object TestUtils extends Logging {
    *         LeaderDuringDelete).
    * @throws AssertionError if the expected condition is not true within the timeout.
    */
-  def waitUntilLeaderIsElectedOrChanged(zkClient: KafkaZkClient, topic: String, partition: Int, timeoutMs: Long = 30000L,
-                                        oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = {
+  def waitUntilLeaderIsElectedOrChanged(
+    zkClient: KafkaZkClient,
+    topic: String,
+    partition: Int,
+    timeoutMs: Long = 30000L,
+    oldLeaderOpt: Option[Int] = None,
+    newLeaderOpt: Option[Int] = None
+  ): Int = {
+    def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+      zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+    }
+    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+  }
+
+  /**
+   *  If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
+   *  If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
+   *  If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
+   *
+   * @return The new leader (note that negative values are used to indicate conditions like NoLeader and
+   *         LeaderDuringDelete).
+   * @throws AssertionError if the expected condition is not true within the timeout.
+   */
+  def waitUntilLeaderIsElectedOrChangedWithAdmin(
+    admin: Admin,
+    topic: String,
+    partition: Int,
+    timeoutMs: Long = 30000L,
+    oldLeaderOpt: Option[Int] = None,
+    newLeaderOpt: Option[Int] = None
+  ): Int = {
+    def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+      admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala.
+        find(_.partition() == partition).
+        flatMap { p =>
+          if (p.leader().id() == Node.noNode().id()) {
+            None
+          } else {
+            Some(p.leader().id())
+          }
+        }
+    }
+    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+  }
+
+  private def doWaitUntilLeaderIsElectedOrChanged(
+    getPartitionLeader: (String, Int) => Option[Int],
+    topic: String,
+    partition: Int,
+    timeoutMs: Long,
+    oldLeaderOpt: Option[Int],
+    newLeaderOpt: Option[Int]
+  ): Int = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
     val startTime = System.currentTimeMillis()
     val topicPartition = new TopicPartition(topic, partition)
@@ -884,7 +934,7 @@ object TestUtils extends Logging {
     var electedOrChangedLeader: Option[Int] = None
     while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {
       // check if leader is elected
-      leader = zkClient.getLeaderForPartition(topicPartition)
+      leader = getPartitionLeader(topic, partition)
       leader match {
         case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
           case (Some(newLeader), _) if newLeader == l =>