You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/30 15:10:55 UTC

[kafka] branch trunk updated: KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af8100b  KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)
af8100b is described below

commit af8100b94fda4a27511797233e9845078ae8a69f
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Sat Oct 30 08:00:34 2021 -0700

    KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)
    
    Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from
    this class can test Kafka in both ZK and KRaft mode. Test cases which do this can specify the
    modes they support by including a ParameterizedTest annotation before each test case, like the
    following:
    
    @ParameterizedTest
    @valuesource(strings = Array("zk", "kraft"))
    def testValidCreateTopicsRequests(quorum: String): Unit = { ... }
    
    For each value that is specified here (zk, kraft), the test case will be run once in the
    appropriate mode. So the test shown above is run twice. This allows integration tests to be
    incrementally converted over to support KRaft mode, rather than rewritten to support it. For
    now, test cases which do not specify a quorum argument will continue to run only in ZK mode.
    
    JUnit5 makes the quorum annotation visible in the TestInfo object which each @BeforEeach
    function in a test can optionally take. Therefore, this PR converts over the setUp function of
    the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo
    object gets "passed up the chain" to the base class, where it determines which quorum type we
    create (ZK or KRaft). In a few cases, I discovered test cases inheriting from the test harness
    that had more than one @BeforeEach function. Because the JUnit5 framework does not define the
    order in which @BeforeEach hooks are run, I changed these to overload setUp() instead, to avoid
    undefined behavior.
    
    The general approach taken here is to make as much as possible work with KRaft, but to leave some
    things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient
    object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly
    request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode.
    As a proof of concept, this PR converts over kafka.api.MetricsTest to support KRaft.
    
    This PR also renames the quorum controller event handler thread to include the text
    "QuorumControllerEventHandler". This allows QuorumTestHarness to check for hanging quorum
    controller threads, as it does for hanging ZK-based controller threads.
    
    Finally, ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
    caused many failing test runs. Therefore, I disabled it here and filed KAFKA-13421 to fix the
    test logic to be more reliable.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Igor Soarez <so...@apple.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  10 +-
 core/src/main/scala/kafka/server/KafkaBroker.scala |   5 +
 core/src/main/scala/kafka/server/KafkaServer.scala |   4 +-
 .../test/junit/ZkClusterInvocationContext.java     |   3 +-
 .../kafka/admin/ListOffsetsIntegrationTest.scala   |   6 +-
 .../admin/ReassignPartitionsIntegrationTest.scala  |   4 +-
 .../kafka/api/AbstractConsumerTest.scala           |   6 +-
 .../AdminClientWithPoliciesIntegrationTest.scala   |   6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   9 +-
 .../kafka/api/BaseAdminIntegrationTest.scala       |   6 +-
 .../kafka/api/BaseProducerSendTest.scala           |   6 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |   6 +-
 .../integration/kafka/api/ClientIdQuotaTest.scala  |   6 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |   1 +
 .../kafka/api/ConsumerTopicCreationTest.scala      |   6 +-
 .../kafka/api/CustomQuotaCallbackTest.scala        |   6 +-
 .../DelegationTokenEndToEndAuthorizationTest.scala |   7 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   6 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  12 +-
 .../kafka/api/EndToEndClusterIdTest.scala          |   6 +-
 .../kafka/api/GroupAuthorizerIntegrationTest.scala |   7 +-
 .../kafka/api/IntegrationTestHarness.scala         |  45 ++-
 .../integration/kafka/api/LogAppendTimeTest.scala  |   6 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |   6 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   7 +-
 .../api/PlaintextEndToEndAuthorizationTest.scala   |   6 +-
 .../kafka/api/ProducerCompressionTest.scala        |  10 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |   6 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   6 +-
 .../kafka/api/SaslEndToEndAuthorizationTest.scala  |   6 +-
 .../kafka/api/SaslMultiMechanismConsumerTest.scala |   6 +-
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |   8 +-
 .../kafka/api/SaslPlaintextConsumerTest.scala      |   6 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala    |   6 +-
 .../kafka/api/SaslSslAdminIntegrationTest.scala    |   9 +-
 .../kafka/api/SaslSslConsumerTest.scala            |   6 +-
 .../kafka/api/SslEndToEndAuthorizationTest.scala   |   6 +-
 .../kafka/api/TransactionsExpirationTest.scala     |   6 +-
 .../integration/kafka/api/TransactionsTest.scala   |   6 +-
 .../api/TransactionsWithMaxInFlightOneTest.scala   |   6 +-
 .../kafka/api/UserClientIdQuotaTest.scala          |   6 +-
 .../integration/kafka/api/UserQuotaTest.scala      |   6 +-
 .../kafka/network/DynamicConnectionQuotaTest.scala |   6 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  11 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |   6 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  10 +-
 .../kafka/server/QuorumTestHarness.scala           | 339 +++++++++++++++++++++
 .../kafka/tools/MirrorMakerIntegrationTest.scala   |   8 +-
 .../scala/kafka/tools/GetOffsetShellTest.scala     |   5 +-
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |  46 +++
 .../other/kafka/ReplicationQuotasTestRig.scala     |  10 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala    |  10 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |   6 +-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |   6 +-
 .../kafka/admin/ConsumerGroupCommandTest.scala     |   6 +-
 .../kafka/admin/DelegationTokenCommandTest.scala   |   6 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   6 +-
 .../kafka/admin/TopicCommandIntegrationTest.scala  |   4 +-
 .../ZkNodeChangeNotificationListenerTest.scala     |  11 +-
 .../controller/ControllerIntegrationTest.scala     |  10 +-
 .../kafka/integration/KafkaServerTestHarness.scala | 113 +++++--
 .../MetricsDuringTopicCreationDeletionTest.scala   |   6 +-
 .../integration/UncleanLeaderElectionTest.scala    |  10 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     | 118 ++++---
 .../kafka/security/auth/ZkAuthorizationTest.scala  |   9 +-
 .../security/authorizer/AclAuthorizerTest.scala    |  12 +-
 .../authorizer/AclAuthorizerWithZkSaslTest.scala   |  12 +-
 .../AuthorizerInterfaceDefaultTest.scala           |  10 +-
 .../delegation/DelegationTokenManagerTest.scala    |  12 +-
 .../AddPartitionsToTxnRequestServerTest.scala      |   6 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala    |   4 +-
 .../kafka/server/BrokerEpochIntegrationTest.scala  |  10 +-
 .../kafka/server/ControllerMutationQuotaTest.scala |   7 +-
 .../DelegationTokenRequestsOnPlainTextTest.scala   |   6 +-
 .../kafka/server/DelegationTokenRequestsTest.scala |   6 +-
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |   6 +-
 .../kafka/server/DescribeClusterRequestTest.scala  |   7 +-
 .../unit/kafka/server/DynamicConfigTest.scala      |   4 +-
 .../FetchRequestDownConversionConfigTest.scala     |   6 +-
 .../kafka/server/FetchRequestMaxBytesTest.scala    |   6 +-
 .../FinalizedFeatureChangeListenerTest.scala       |   5 +-
 .../server/KafkaMetricReporterClusterIdTest.scala  |  10 +-
 .../KafkaMetricReporterExceptionHandlingTest.scala |   6 +-
 .../kafka/server/KafkaMetricsReporterTest.scala    |  10 +-
 .../scala/unit/kafka/server/KafkaServerTest.scala  |   4 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |  10 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   6 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |  10 +-
 .../unit/kafka/server/MetadataRequestTest.scala    |   6 +-
 .../server/MetadataRequestWithForwardingTest.scala |   6 +-
 .../unit/kafka/server/OffsetFetchRequestTest.scala |   8 +-
 .../scala/unit/kafka/server/ReplicaFetchTest.scala |  10 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   6 +-
 .../kafka/server/ServerGenerateBrokerIdTest.scala  |  10 +-
 .../kafka/server/ServerGenerateClusterIdTest.scala |  10 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |  10 +-
 .../unit/kafka/server/ServerStartupTest.scala      |   4 +-
 .../TopicIdWithOldInterBrokerProtocolTest.scala    |   6 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  10 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   4 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala    |   9 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   6 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  10 +-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 101 ------
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala |  14 +-
 .../apache/kafka/controller/QuorumController.java  |   4 +-
 .../TopicBasedRemoteLogMetadataManagerHarness.java |   3 +-
 109 files changed, 946 insertions(+), 539 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b3b8e83..ad549ff 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -139,7 +139,7 @@ class BrokerServer(
 
   var clientQuotaMetadataManager: ClientQuotaMetadataManager = null
 
-  private var _brokerTopicStats: BrokerTopicStats = null
+  @volatile var brokerTopicStats: BrokerTopicStats = null
 
   val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
 
@@ -155,8 +155,6 @@ class BrokerServer(
 
   def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
 
-  private[kafka] def brokerTopicStats = _brokerTopicStats
-
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
     lock.lock()
     try {
@@ -178,7 +176,7 @@ class BrokerServer(
 
   def replicaManager: ReplicaManager = _replicaManager
 
-  def startup(): Unit = {
+  override def startup(): Unit = {
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try {
       info("Starting broker")
@@ -190,7 +188,7 @@ class BrokerServer(
       kafkaScheduler.startup()
 
       /* register broker metrics */
-      _brokerTopicStats = new BrokerTopicStats
+      brokerTopicStats = new BrokerTopicStats
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
 
@@ -543,7 +541,7 @@ class BrokerServer(
     }
   }
 
-  def awaitShutdown(): Unit = {
+  override def awaitShutdown(): Unit = {
     lock.lock()
     try {
       while (true) {
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index d1a2800..f4c6abc 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -22,6 +22,7 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.log.LogManager
 import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
 import kafka.network.SocketServer
+import kafka.security.CredentialProvider
 import kafka.utils.KafkaScheduler
 import org.apache.kafka.common.ClusterResource
 import org.apache.kafka.common.internals.ClusterResourceListeners
@@ -82,7 +83,11 @@ trait KafkaBroker extends KafkaMetricsGroup {
   def metadataCache: MetadataCache
   def groupCoordinator: GroupCoordinator
   def boundPort(listenerName: ListenerName): Int
+  def startup(): Unit
+  def awaitShutdown(): Unit
   def shutdown(): Unit
+  def brokerTopicStats: BrokerTopicStats
+  def credentialProvider: CredentialProvider
 
   // For backwards compatibility, we need to keep older metrics tied
   // to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 19ede1b..88ccb6d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -155,7 +155,7 @@ class KafkaServer(
   }.toMap
 
   private var _clusterId: String = null
-  private var _brokerTopicStats: BrokerTopicStats = null
+  @volatile var _brokerTopicStats: BrokerTopicStats = null
 
   private var _featureChangeListener: FinalizedFeatureChangeListener = null
 
@@ -169,7 +169,7 @@ class KafkaServer(
   // Visible for testing
   private[kafka] def zkClient = _zkClient
 
-  private[kafka] def brokerTopicStats = _brokerTopicStats
+  override def brokerTopicStats = _brokerTopicStats
 
   private[kafka] def featureChangeListener = _featureChangeListener
 
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 4d4323f..4d9c6b7 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -23,6 +23,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.test.ClusterConfig;
 import kafka.test.ClusterInstance;
+import kafka.utils.EmptyTestInfo;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
@@ -252,7 +253,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
         @Override
         public void start() {
             if (started.compareAndSet(false, true)) {
-                clusterReference.get().setUp();
+                clusterReference.get().setUp(new EmptyTestInfo());
             }
         }
 
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index c937030..6e20188 100644
--- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -36,8 +36,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
   var adminClient: Admin = null
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     createTopic(topicName, 1, 1.toShort)
     produceMessages()
     adminClient = Admin.create(Map[String, Object](
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 55123f3..2969a95 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.api.KAFKA_2_7_IV1
 import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkIsrManager}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.config.ConfigResource
@@ -39,7 +39,7 @@ import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 
 @Timeout(300)
-class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
+class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
 
   var cluster: ReassignPartitionsTestCluster = null
 
diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index a21f3ac..56bc47c 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
 import kafka.utils.{ShutdownableThread, TestUtils}
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.{ArrayBuffer, Buffer}
@@ -74,8 +74,8 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     // create the test topic with all the brokers as replicas
     createTopic(topic, 2, brokerCount)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 1ba56ea..61018bb 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationE
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -43,8 +43,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
   val brokerCount = 3
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e5f095d..fbb6f79 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,6 +18,7 @@ import java.util
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
+
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
 import kafka.log.LogConfig
 import kafka.security.authorizer.{AclAuthorizer, AclEntry}
@@ -60,11 +61,11 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid, requests}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-
 import java.util.Collections.singletonList
+
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -314,8 +315,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   )
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
 
     // Allow inter-broker communication
     addAndVerifyAcls(Set(new AccessControlEntry(brokerPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource)
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 7525cd2..9c8e32f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrParti
 import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -51,8 +51,8 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
   var client: Admin = _
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     waitUntilBrokerMetadataIsPropagated(servers)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 1f7e9d1..8c2b6da 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
@@ -56,8 +56,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   private val numRecords = 100
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 34ea07e..9f73236 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
@@ -74,8 +74,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   var quotaTestClients: QuotaTestClients = _
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     val numPartitions = 1
     val leaders = createTopic(topic1, numPartitions, brokerCount)
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 28b0b15..e4cebe2 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,7 @@ package kafka.api
 
 import kafka.server.KafkaServer
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 class ClientIdQuotaTest extends BaseQuotaTest {
 
@@ -24,8 +24,8 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     quotaTestClients.alterClientQuotas(
       quotaTestClients.clientQuotaAlteration(
         quotaTestClients.clientQuotaEntity(None, Some(QuotaTestClients.DefaultEntity)),
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 90748e7..9fc8727f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -299,6 +299,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     * Then, 1 consumer should be left out of the group.
     */
   @Test
+  @Disabled // To be re-enabled once we fix KAFKA-13421
   def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
     val group = "group-max-size-test"
     val topic = "group-max-size-test"
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
index 9335eac..e0853e4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -24,7 +24,7 @@ import java.util.Collections
 
 import kafka.api
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{EmptyTestInfo, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -41,7 +41,7 @@ class ConsumerTopicCreationTest {
   @MethodSource(Array("parameters"))
   def testAutoTopicCreation(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
     val testCase = new ConsumerTopicCreationTest.TestCase(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
-    testCase.setUp()
+    testCase.setUp(new EmptyTestInfo())
     try testCase.test() finally testCase.tearDown()
   }
 
@@ -49,7 +49,7 @@ class ConsumerTopicCreationTest {
   @MethodSource(Array("parameters"))
   def testAutoTopicCreationWithForwarding(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
     val testCase = new api.ConsumerTopicCreationTest.TestCaseWithForwarding(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
-    testCase.setUp()
+    testCase.setUp(new EmptyTestInfo())
     try testCase.test() finally testCase.tearDown()
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index bfc967a..201fc4b 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.server.quota._
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
@@ -61,13 +61,13 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
   val defaultConsumeQuota = 1000 * 1000 * 1000
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
     this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName)
     this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}",
       classOf[GroupedUserPrincipalBuilder].getName)
     this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
-    super.setUp()
+    super.setUp(testInfo)
     brokerList = TestUtils.bootstrapServers(servers, listenerName)
 
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index a2db015..7336055 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -17,6 +17,7 @@
 package kafka.api
 
 import java.util.Properties
+
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.zk.ConfigEntityChangeNotificationZNode
@@ -26,7 +27,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -98,9 +99,9 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
-    super.setUp()
+    super.setUp(testInfo)
     privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 22fee47..0a46972 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.Authorizer
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -107,9 +107,9 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
     TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
     client = Admin.create(createConfig())
   }
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index df60010..cbb536b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -54,14 +54,14 @@ import scala.jdk.CollectionConverters._
   * extends IntegrationTestHarness. IntegrationTestHarness creates producers and
   * consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness starts
   * brokers, but first it initializes a ZooKeeper server and client, which happens
-  * in ZooKeeperTestHarness.
+  * in QuorumTestHarness.
   *
   * To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
   * The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly.
   *
   * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
-  * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and we
-  * would end up with ZooKeeperTestHarness twice.
+  * SaslTestHarness here directly because it extends QuorumTestHarness, and we
+  * would end up with QuorumTestHarness twice.
   */
 abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   override val brokerCount = 3
@@ -198,8 +198,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * Starts MiniKDC and only then sets up the parent trait.
     */
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(ClusterActionAndClusterAlterAcls, s.dataPlaneRequestProcessor.authorizer.get, clusterResource)
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", LITERAL))
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index e41efce..7b05113 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
 import org.apache.kafka.test.{TestUtils => _, _}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import org.apache.kafka.test.TestUtils.isValidClusterId
@@ -106,8 +106,8 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     MockDeserializer.resetStaticVariables
     // create the consumer offset topic
     createTopic(topic, 2, serverCount)
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 4f76735..cedfc17 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.util.Properties
 import java.util.concurrent.ExecutionException
+
 import kafka.api.GroupAuthorizerIntegrationTest._
 import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
@@ -30,7 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -75,8 +76,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
 
     // Allow inter-broker communication
     TestUtils.addAndVerifyAcls(servers.head,
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 83c1cac..8989044 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -30,7 +30,7 @@ import kafka.integration.KafkaServerTestHarness
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 import scala.collection.mutable
 import scala.collection.Seq
@@ -58,10 +58,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
   }
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
     configureListeners(cfgs)
     modifyConfigs(cfgs)
+    insertControllerListenersIfNeeded(cfgs)
     cfgs.map(KafkaConfig.fromProps)
   }
 
@@ -75,22 +76,45 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
       val listenerSecurityMap = listenerNames.map(listenerName => s"${listenerName.value}:${securityProtocol.name}").mkString(",")
 
       config.setProperty(KafkaConfig.ListenersProp, listeners)
+      config.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap)
     }
   }
 
+  private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = {
+    if (isKRaftTest()) {
+      props.foreach { config =>
+        // Add the CONTROLLER listener to "listeners" if it is not already there.
+        // But do not add it to advertised.listeners.
+        val listeners = config.getProperty(KafkaConfig.ListenersProp, "").split(",")
+        val listenerNames = listeners.map(_.replaceFirst(":.*", ""))
+        if (!listenerNames.contains("CONTROLLER")) {
+          config.setProperty(KafkaConfig.ListenersProp,
+            (listeners ++ Seq("CONTROLLER://localhost:0")).mkString(","))
+        }
+        // Add a security protocol for the CONTROLLER endpoint, if one is not already set.
+        val securityPairs = config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
+        if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
+          config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+            (securityPairs ++ Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+        }
+      }
+    }
+  }
+
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = true)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = true)
   }
 
-  def doSetup(createOffsetsTopic: Boolean): Unit = {
+  def doSetup(testInfo: TestInfo,
+              createOffsetsTopic: Boolean): Unit = {
     // Generate client security properties before starting the brokers in case certs are needed
     producerConfig ++= clientSecurityProps("producer")
     consumerConfig ++= clientSecurityProps("consumer")
     adminClientConfig ++= clientSecurityProps("adminClient")
 
-    super.setUp()
+    super.setUp(testInfo)
 
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1")
@@ -105,8 +129,13 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
     adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
 
-    if (createOffsetsTopic)
-      TestUtils.createOffsetsTopic(zkClient, servers)
+    if (createOffsetsTopic) {
+      if (isKRaftTest()) {
+        TestUtils.createOffsetsTopicWithAdmin(brokers, adminClientConfig)
+      } else {
+        TestUtils.createOffsetsTopic(zkClient, servers)
+      }
+    }
   }
 
   def clientSecurityProps(certAlias: String): Properties = {
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index d0007fb..6f397d8 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -22,7 +22,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.record.TimestampType
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
 
 /**
@@ -41,8 +41,8 @@ class LogAppendTimeTest extends IntegrationTestHarness {
   private val topic = "topic"
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     createTopic(topic)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index c5c17c2..850ac89 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.authenticator.TestJaasConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 
 import scala.annotation.nowarn
@@ -54,10 +54,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     verifyNoRequestMetrics("Request metrics not removed in a previous test")
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index ef8174d0..94fc04c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, Properties}
 import java.{time, util}
+
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
 import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
@@ -43,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 import org.slf4j.LoggerFactory
 
 import scala.annotation.nowarn
@@ -70,8 +71,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   private val changedBrokerLoggers = scala.collection.mutable.Set[String]()
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     brokerLoggerConfigResource = new ConfigResource(
       ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
   }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index c44028c..8e69a2d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -20,7 +20,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import org.apache.kafka.common.errors.TopicAuthorizationException
 
@@ -71,9 +71,9 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(List.empty, None, ZkSasl))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 9ea0bb5..62b2689 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,19 +19,19 @@ package kafka.api.test
 
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
 import java.util.{Collections, Properties}
 import scala.jdk.CollectionConverters._
 
-class ProducerCompressionTest extends ZooKeeperTestHarness {
+class ProducerCompressionTest extends QuorumTestHarness {
 
   private val brokerId = 0
   private val topic = "topic"
@@ -40,8 +40,8 @@ class ProducerCompressionTest extends ZooKeeperTestHarness {
   private var server: KafkaServer = null
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     server = TestUtils.createServer(KafkaConfig.fromProps(props))
   }
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 1adb26c..4d45da9 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
@@ -60,8 +60,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val topic2 = "topic-2"
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     producer1 = TestUtils.createProducer(brokerList, acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 2a7d5c9..a9f2c6c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
 import kafka.server.KafkaConfig
@@ -63,10 +63,10 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
     createTopic(topic, numPartitions, brokerCount)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index e425aba..e406450 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
-import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
 
 import scala.collection.immutable.List
@@ -34,7 +34,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   protected def kafkaServerSaslMechanisms: List[String]
   
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     // create static config including client login context with credentials for JaasTestUtils 'client2'
     startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
     // set dynamic properties with credentials for JaasTestUtils 'client1' so that dynamic JAAS configuration is also
@@ -43,7 +43,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index eb2eab4..37b36e1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.io.File
 
 import kafka.server.KafkaConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
@@ -31,10 +31,10 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index d3ca141..042aa3d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -19,7 +19,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected def listenerName = new ListenerName("CLIENT")
@@ -28,7 +28,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   private val kafkaServerJaasEntryName =
     s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
-  // disable secure acls of zkClient in ZooKeeperTestHarness
+  // disable secure acls of zkClient in QuorumTestHarness
   override protected def zkAclsEnabled = Some(false)
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
@@ -36,9 +36,9 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
index b7eb39b..0933818 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -14,15 +14,15 @@ package kafka.api
 
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 022b2f4..6e334d1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.test.TestSslUtils
 
 import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
   override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -52,8 +52,8 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   override def createPrivilegedAdminClient() = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     // Create client credentials after starting brokers so that dynamic credential creation is also tested
     createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
     createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 53a267f..3cb424d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.io.File
 import java.util
+
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
@@ -33,9 +34,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import java.util.Collections
+
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
@@ -62,9 +63,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     setUpSasl()
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   def setUpSasl(): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 2b1db11..563481d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -17,7 +17,7 @@ import java.io.File
 import kafka.server.KafkaConfig
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@@ -25,9 +25,9 @@ class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 65bef41..850d99f 100644
--- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.Java
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 object SslEndToEndAuthorizationTest {
   class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
@@ -70,9 +70,9 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, s"O=A client,CN=$clientCn")
   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(List.empty, None, ZkSasl))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   override def clientSecurityProps(certAlias: String): Properties = {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index ecdf435..1a656ef 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.errors.InvalidPidMappingException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -46,8 +46,8 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     producer = TestUtils.createTransactionalProducer("transactionalProducer", servers)
     consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 5afad85..1fbba9e 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -59,8 +59,8 @@ class TransactionsTest extends KafkaServerTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
     createTopic(topic1, numPartitions, numServers, topicConfig)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index 20094b6..267792f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.Seq
 import scala.collection.mutable.Buffer
@@ -51,8 +51,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString)
     createTopic(topic1, numPartitions, numServers, topicConfig)
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e442dda..83c70da 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -19,7 +19,7 @@ import java.io.File
 import kafka.server._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 class UserClientIdQuotaTest extends BaseQuotaTest {
 
@@ -30,9 +30,9 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required")
-    super.setUp()
+    super.setUp(testInfo)
     quotaTestClients.alterClientQuotas(
       quotaTestClients.clientQuotaAlteration(
         quotaTestClients.clientQuotaEntity(Some(QuotaTestClients.DefaultEntity), Some(QuotaTestClients.DefaultEntity)),
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 7a00f40..ffbaebb 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -19,7 +19,7 @@ import java.io.File
 import kafka.server.KafkaServer
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 class UserQuotaTest extends BaseQuotaTest with SaslSetup {
 
@@ -31,9 +31,9 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
     quotaTestClients.alterClientQuotas(
       quotaTestClients.clientQuotaAlteration(
         quotaTestClients.clientQuotaEntity(Some(QuotaTestClients.DefaultEntity), None),
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 98bc641..14f3e8f 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{KafkaException, requests}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -56,8 +56,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers)
   }
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 84e8099..0c0950a 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -35,9 +35,10 @@ import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.metrics.KafkaYammerMetrics
 import kafka.network.{Processor, RequestChannel}
+import kafka.server.QuorumTestHarness
 import kafka.utils._
 import kafka.utils.Implicits._
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityChangeNotificationZNode}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
@@ -61,7 +62,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
 import scala.annotation.nowarn
 import scala.collection._
@@ -74,7 +75,7 @@ object DynamicBrokerReconfigurationTest {
   val SecureExternal = "EXTERNAL"
 }
 
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup {
 
   import DynamicBrokerReconfigurationTest._
 
@@ -101,9 +102,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
-    super.setUp()
+    super.setUp(testInfo)
 
     clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new ones can be tested
 
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 5c75507..2bda3ac 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
 import org.apache.kafka.common.security.kerberos.KerberosLogin
 import org.apache.kafka.common.utils.{LogContext, MockTime}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -60,12 +60,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   private val failedAuthenticationDelayMs = 2000
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     TestableKerberosLogin.reset()
     startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
     serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
     serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, failedAuthenticationDelayMs.toString)
-    super.setUp()
+    super.setUp(testInfo)
     serverAddr = new InetSocketAddress("localhost",
       servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
 
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 9709148..1d865f9 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -27,14 +27,14 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.Implicits._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -49,7 +49,7 @@ object MultipleListenersWithSameSecurityProtocolBaseTest {
   val Plain = "PLAIN"
 }
 
-abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup {
+abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumTestHarness with SaslSetup {
 
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
@@ -67,9 +67,9 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
   protected def dynamicJaasSections: Properties
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(staticJaasSections)
-    super.setUp()
+    super.setUp(testInfo)
     // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
     val numServers = 2
 
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
new file mode 100755
index 0000000..69be4f5
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.net.InetSocketAddress
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.CompletableFuture
+
+import javax.security.auth.login.Configuration
+import kafka.raft.KafkaRaftManager
+import kafka.tools.StorageTool
+import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
+import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
+
+import scala.collection.{Seq, immutable}
+
+trait QuorumImplementation {
+  def createAndStartBroker(config: KafkaConfig,
+                           time: Time): KafkaBroker
+
+  def shutdown(): Unit
+}
+
+class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
+                                    val zkClient: KafkaZkClient,
+                                    val adminZkClient: AdminZkClient,
+                                    val log: Logging) extends QuorumImplementation {
+  override def createAndStartBroker(config: KafkaConfig,
+                                    time: Time): KafkaBroker = {
+    val server = new KafkaServer(config, time, None, false)
+    server.startup()
+    server
+  }
+
+  override def shutdown(): Unit = {
+    CoreUtils.swallow(zkClient.close(), log)
+    CoreUtils.swallow(zookeeper.shutdown(), log)
+  }
+}
+
+class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndVersion],
+                                val controllerServer: ControllerServer,
+                                val metadataDir: File,
+                                val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+                                val clusterId: String,
+                                val log: Logging) extends QuorumImplementation {
+  override def createAndStartBroker(config: KafkaConfig,
+                                    time: Time): KafkaBroker = {
+    val broker = new BrokerServer(config = config,
+      metaProps = new MetaProperties(clusterId, config.nodeId),
+      raftManager = raftManager,
+      time = time,
+      metrics = new Metrics(),
+      threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
+      initialOfflineDirs = Seq(),
+      controllerQuorumVotersFuture = controllerQuorumVotersFuture,
+      supportedFeatures = Collections.emptyMap())
+    broker.startup()
+    broker
+  }
+
+  override def shutdown(): Unit = {
+    CoreUtils.swallow(raftManager.shutdown(), log)
+    CoreUtils.swallow(controllerServer.shutdown(), log)
+  }
+}
+
+@Tag("integration")
+abstract class QuorumTestHarness extends Logging {
+  val zkConnectionTimeout = 10000
+  val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
+  val zkMaxInFlightRequests = Int.MaxValue
+
+  protected def zkAclsEnabled: Option[Boolean] = None
+
+  /**
+   * When in KRaft mode, the security protocol to use for the controller listener.
+   * Can be overridden by subclasses.
+   */
+  protected def controllerListenerSecurityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+
+  protected def kraftControllerConfigs(): Seq[Properties] = {
+    Seq(new Properties())
+  }
+
+  private var implementation: QuorumImplementation = null
+
+  def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
+
+  def checkIsZKTest(): Unit = {
+    if (isKRaftTest()) {
+      throw new RuntimeException("This function can't be accessed when running the test " +
+        "in KRaft mode. ZooKeeper mode is required.")
+    }
+  }
+
+  def checkIsKRaftTest(): Unit = {
+    if (!isKRaftTest()) {
+      throw new RuntimeException("This function can't be accessed when running the test " +
+        "in ZooKeeper mode. KRaft mode is required.")
+    }
+  }
+
+  private def asZk(): ZooKeeperQuorumImplementation = {
+    checkIsZKTest()
+    implementation.asInstanceOf[ZooKeeperQuorumImplementation]
+  }
+
+  private def asKRaft(): KRaftQuorumImplementation = {
+    checkIsKRaftTest()
+    implementation.asInstanceOf[KRaftQuorumImplementation]
+  }
+
+  def zookeeper: EmbeddedZookeeper = asZk().zookeeper
+
+  def zkClient: KafkaZkClient = asZk().zkClient
+
+  def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else asZk().zkClient
+
+  def adminZkClient: AdminZkClient = asZk().adminZkClient
+
+  def zkPort: Int = asZk().zookeeper.port
+
+  def zkConnect: String = s"127.0.0.1:$zkPort"
+
+  def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect
+
+  def controllerServer: ControllerServer = asKRaft().controllerServer
+
+  // Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
+  // order of multiple @BeforeEach methods that are declared within a single test class or test
+  // interface." Therefore, if you have things you would like to do before each test case runs, it
+  // is best to override this function rather than declaring a new @BeforeEach function.
+  // That way you control the initialization order.
+  @BeforeEach
+  def setUp(testInfo: TestInfo): Unit = {
+    val name = if (testInfo.getTestMethod().isPresent()) {
+      testInfo.getTestMethod().get().toString()
+    } else {
+      "[unspecified]"
+    }
+    if (TestInfoUtils.isKRaft(testInfo)) {
+      info(s"Running KRAFT test ${name}")
+      implementation = newKRaftQuorum(testInfo)
+    } else {
+      info(s"Running ZK test ${name}")
+      implementation = newZooKeeperQuorum()
+    }
+  }
+
+  def createAndStartBroker(config: KafkaConfig,
+                           time: Time = Time.SYSTEM): KafkaBroker = {
+    implementation.createAndStartBroker(config,
+      time)
+  }
+
+  def shutdownZooKeeper(): Unit = asZk().shutdown()
+
+  private def formatDirectories(directories: immutable.Seq[String],
+                                metaProperties: MetaProperties): Unit = {
+    val stream = new ByteArrayOutputStream()
+    var out: PrintStream = null
+    try {
+      out = new PrintStream(stream)
+      if (StorageTool.formatCommand(out, directories, metaProperties, false) != 0) {
+        throw new RuntimeException(stream.toString())
+      }
+      debug(s"Formatted storage directory(ies) ${directories}")
+    } finally {
+      if (out != null) out.close()
+      stream.close()
+    }
+  }
+
+  private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
+    val clusterId = Uuid.randomUuid().toString
+    val metadataDir = TestUtils.tempDir()
+    val metaProperties = new MetaProperties(clusterId, 0)
+    formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties)
+    val controllerMetrics = new Metrics()
+    val propsList = kraftControllerConfigs()
+    if (propsList.size != 1) {
+      throw new RuntimeException("Only one KRaft controller is supported for now.")
+    }
+    val props = propsList(0)
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.NodeIdProp, "1000")
+    props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath())
+    val proto = controllerListenerSecurityProtocol.toString()
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${proto}:${proto}")
+    props.setProperty(KafkaConfig.ListenersProp, s"${proto}://localhost:0")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, proto)
+    props.setProperty(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
+    val config = new KafkaConfig(props)
+    val threadNamePrefix = "Controller_" + testInfo.getDisplayName
+    val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, AddressSpec]]
+    val raftManager = new KafkaRaftManager(
+      metaProperties = metaProperties,
+      config = config,
+      recordSerde = MetadataRecordSerde.INSTANCE,
+      topicPartition = new TopicPartition(KafkaRaftServer.MetadataTopic, 0),
+      topicId = KafkaRaftServer.MetadataTopicId,
+      time = Time.SYSTEM,
+      metrics = controllerMetrics,
+      threadNamePrefixOpt = Option(threadNamePrefix),
+      controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+    var controllerServer: ControllerServer = null
+    try {
+      controllerServer = new ControllerServer(
+        metaProperties = metaProperties,
+        config = config,
+        raftManager = raftManager,
+        time = Time.SYSTEM,
+        metrics = controllerMetrics,
+        threadNamePrefix = Option(threadNamePrefix),
+        controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+      controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
+        if (e != null) {
+          error("Error completing controller socket server future", e)
+          controllerQuorumVotersFuture.completeExceptionally(e)
+        } else {
+          controllerQuorumVotersFuture.complete(Collections.singletonMap(1000,
+            new InetAddressSpec(new InetSocketAddress("localhost", port))))
+        }
+      })
+      controllerServer.startup()
+      raftManager.startup()
+      controllerServer.startup()
+    } catch {
+      case e: Throwable =>
+        CoreUtils.swallow(raftManager.shutdown(), this)
+        if (controllerServer != null) CoreUtils.swallow(controllerServer.shutdown(), this)
+        throw e
+    }
+    new KRaftQuorumImplementation(raftManager,
+      controllerServer,
+      metadataDir,
+      controllerQuorumVotersFuture,
+      clusterId,
+      this)
+  }
+
+  private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    var adminZkClient: AdminZkClient = null
+    try {
+      zkClient = KafkaZkClient(s"127.0.0.1:${zookeeper.port}",
+        zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
+        zkSessionTimeout,
+        zkConnectionTimeout,
+        zkMaxInFlightRequests,
+        Time.SYSTEM,
+        name = "ZooKeeperTestHarness",
+        new ZKClientConfig)
+      adminZkClient = new AdminZkClient(zkClient)
+    } catch {
+      case t: Throwable =>
+        CoreUtils.swallow(zookeeper.shutdown(), this)
+        if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
+        throw t
+    }
+    new ZooKeeperQuorumImplementation(zookeeper,
+      zkClient,
+      adminZkClient,
+      this)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (implementation != null) {
+      implementation.shutdown()
+    }
+    Configuration.setConfiguration(null)
+  }
+
+  // Trigger session expiry by reusing the session id in another client
+  def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
+    val dummyWatcher = new Watcher {
+      override def process(event: WatchedEvent): Unit = {}
+    }
+    val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+      zooKeeper.getSessionId,
+      zooKeeper.getSessionPasswd)
+    assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+    anotherZkClient
+  }
+}
+
+object QuorumTestHarness {
+  val ZkClientEventThreadSuffix = "-EventThread"
+
+  /**
+   * Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
+   * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
+   * which is true for core tests where this harness is used.
+   */
+  @BeforeAll
+  def setUpClass(): Unit = {
+    TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
+  }
+
+  /**
+   * Verify that tests from the current test class using QuorumTestHarness haven't left behind an unexpected thread
+   */
+  @AfterAll
+  def tearDownClass(): Unit = {
+    TestUtils.verifyNoUnexpectedThreads("@AfterAll")
+  }
+}
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index ef75787..abcbebc 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -30,10 +30,8 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.utils.Exit
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 
 @deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
@@ -44,9 +42,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
   val exited = new AtomicBoolean(false)
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     Exit.setExitProcedure((_, _) => exited.set(true))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
index cd611de..796ed36 100644
--- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
+++ b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
   private val topicCount = 4
@@ -38,7 +38,8 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
     }.map(KafkaConfig.fromProps)
 
   @BeforeEach
-  def createTestAndInternalTopics(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
 
     val props = new Properties()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
new file mode 100644
index 0000000..ecd656e
--- /dev/null
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import java.lang.reflect.Method
+import java.util
+import java.util.{Collections, Optional}
+
+import org.junit.jupiter.api.TestInfo
+
+class EmptyTestInfo extends TestInfo {
+  override def getDisplayName: String = ""
+  override def getTags: util.Set[String] = Collections.emptySet()
+  override def getTestClass: (Optional[Class[_]]) = Optional.empty()
+  override def getTestMethod: Optional[Method] = Optional.empty()
+}
+
+object TestInfoUtils {
+  def isKRaft(testInfo: TestInfo): Boolean = {
+    if (testInfo.getDisplayName().contains("quorum=")) {
+      if (testInfo.getDisplayName().contains("quorum=kraft")) {
+        true
+      } else if (testInfo.getDisplayName().contains("quorum=zk")) {
+        false
+      } else {
+        throw new RuntimeException(s"Unknown quorum value")
+      }
+    } else {
+      false
+    }
+  }
+}
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 3f72afd..42230e7 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -23,10 +23,10 @@ import java.nio.file.{Files, StandardOpenOption}
 
 import javax.imageio.ImageIO
 import kafka.admin.ReassignPartitionsCommand
-import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness, QuotaType}
 import kafka.utils.TestUtils._
-import kafka.utils.{Exit, Logging, TestUtils}
-import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
+import kafka.utils.{EmptyTestInfo, Exit, Logging, TestUtils}
+import kafka.zk.ReassignPartitionsZNode
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
@@ -80,7 +80,7 @@ object ReplicationQuotasTestRig {
   def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
     val experiment = new Experiment()
     try {
-      experiment.setUp()
+      experiment.setUp(new EmptyTestInfo())
       experiment.run(config, journal, displayChartsOnScreen)
       journal.footer()
     }
@@ -96,7 +96,7 @@ object ReplicationQuotasTestRig {
     val targetBytesPerBrokerMB: Long = msgsPerPartition.toLong * msgSize.toLong * partitions.toLong / brokers.toLong / 1000000
   }
 
-  class Experiment extends ZooKeeperTestHarness with Logging {
+  class Experiment extends QuorumTestHarness with Logging {
     val topicName = "my-topic"
     var experimentName = "unset"
     val partitionId = 0
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index aec78cb..7cd5a18 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -23,7 +23,7 @@ import kafka.admin.AclCommand.AclCommandOptions
 import kafka.security.authorizer.{AclAuthorizer, AclEntry}
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{Exit, LogCaptureAppender, Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType._
@@ -36,9 +36,9 @@ import org.apache.kafka.common.utils.{AppInfoParser, SecurityUtils}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
-class AclCommandTest extends ZooKeeperTestHarness with Logging {
+class AclCommandTest extends QuorumTestHarness with Logging {
 
   var servers: Seq[KafkaServer] = Seq()
 
@@ -102,8 +102,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private var adminArgs: Array[String] = _
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
     brokerProps.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 94c5a77..cd0bb1b 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -49,8 +49,8 @@ class AddPartitionsTest extends BaseRequestTest {
   val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     createTopic(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
     createTopic(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 299d98b..859b3d5 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,9 +21,9 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
+import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig, QuorumTestHarness}
 import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -42,7 +42,7 @@ import org.junit.jupiter.api.Test
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
-class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
+class ConfigCommandTest extends QuorumTestHarness with Logging {
 
   @Test
   def shouldExitWithNonZeroStatusOnArgError(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 458a3ee..6415c16 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -52,8 +52,8 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     createTopic(topic, 1, 1)
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index 05fc339..2071d08 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.mutable.ListBuffer
 import scala.concurrent.ExecutionException
@@ -41,9 +41,9 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
   override def brokerCount = 1
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   override def generateConfigs = {
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4a53c67..bb881f2 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -22,9 +22,9 @@ import java.util.{Collections, Optional, Properties}
 
 import scala.collection.Seq
 import kafka.log.UnifiedLog
-import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
+import kafka.zk.TopicPartitionZNode
 import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import kafka.common.TopicAlreadyMarkedForDeletionException
@@ -34,7 +34,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import scala.jdk.CollectionConverters._
 
-class DeleteTopicTest extends ZooKeeperTestHarness {
+class DeleteTopicTest extends QuorumTestHarness {
 
   var servers: Seq[KafkaServer] = Seq()
 
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 19dbd6a..dfb4ae7 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -80,7 +80,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
   }
 
   @BeforeEach
-  def setup(info: TestInfo): Unit = {
+  override def setUp(info: TestInfo): Unit = {
+    super.setUp(info)
+
     // create adminClient
     val props = new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 4aa7167..9b7f090 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -17,24 +17,25 @@
 package kafka.common
 
 import kafka.utils.TestUtils
-import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness}
+import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore}
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.resource.ResourceType.GROUP
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.Seq
 
-class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
+class ZkNodeChangeNotificationListenerTest extends QuorumTestHarness {
 
   private val changeExpirationMs = 1000
   private var notificationListener: ZkNodeChangeNotificationListener = _
   private var notificationHandler: TestNotificationHandler = _
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     zkClient.createAclPaths()
     notificationHandler = new TestNotificationHandler()
   }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index bdc06a7..2302007 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -23,7 +23,7 @@ import com.yammer.metrics.core.Timer
 import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
 import kafka.controller.KafkaController.AlterIsrCallback
 import kafka.metrics.KafkaYammerMetrics
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
 import kafka.utils.{LogCaptureAppender, TestUtils}
 import kafka.zk.{FeatureZNodeStatus, _}
 import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
@@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
 import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.mockito.Mockito.{doAnswer, spy, verify}
 import org.mockito.invocation.InvocationOnMock
 
@@ -41,14 +41,14 @@ import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
 
-class ControllerIntegrationTest extends ZooKeeperTestHarness {
+class ControllerIntegrationTest extends QuorumTestHarness {
   var servers = Seq.empty[KafkaServer]
   val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1
   val firstControllerEpochZkVersion = KafkaController.InitialControllerEpochZkVersion + 1
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     servers = Seq.empty[KafkaServer]
   }
 
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 6fc18f0..e8fdaf5 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -18,16 +18,17 @@
 package kafka.integration
 
 import java.io.File
+import java.util
 import java.util.Arrays
 
+import kafka.server.QuorumTestHarness
 import kafka.server._
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
-import scala.collection.Seq
-import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.collection.{Seq, mutable}
+import scala.jdk.CollectionConverters._
 import java.util.Properties
 
 import org.apache.kafka.common.{KafkaException, Uuid}
@@ -38,9 +39,25 @@ import org.apache.kafka.common.utils.Time
 /**
  * A test harness that brings up some number of broker nodes
  */
-abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
+abstract class KafkaServerTestHarness extends QuorumTestHarness {
   var instanceConfigs: Seq[KafkaConfig] = null
-  var servers: Buffer[KafkaServer] = new ArrayBuffer
+
+  private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
+
+  /**
+   * Get the list of brokers, which could be either BrokerServer objects or KafkaServer objects.
+   */
+  def brokers: mutable.Buffer[KafkaBroker] = _brokers
+
+  /**
+   * Get the list of brokers, as instances of KafkaServer.
+   * This method should only be used when dealing with brokers that use ZooKeeper.
+   */
+  def servers: mutable.Buffer[KafkaServer] = {
+    checkIsZKTest()
+    _brokers.map(_.asInstanceOf[KafkaServer])
+  }
+
   var brokerList: String = null
   var alive: Array[Boolean] = null
 
@@ -88,8 +105,8 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   protected def enableForwarding: Boolean = false
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     if (configs.isEmpty)
       throw new KafkaException("Must supply at least one server config.")
@@ -100,15 +117,19 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
     // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
     // are shutdown cleanly in tearDown even if a subsequent broker fails to start
     for (config <- configs) {
-      servers += TestUtils.createServer(
-        config,
-        time = brokerTime(config.brokerId),
-        threadNamePrefix = None,
-        enableForwarding
-      )
+      if (isKRaftTest()) {
+        _brokers += createAndStartBroker(config, brokerTime(config.brokerId))
+      } else {
+        _brokers += TestUtils.createServer(
+          config,
+          time = brokerTime(config.brokerId),
+          threadNamePrefix = None,
+          enableForwarding
+        )
+      }
     }
-    brokerList = TestUtils.bootstrapServers(servers, listenerName)
-    alive = new Array[Boolean](servers.length)
+    brokerList = TestUtils.bootstrapServers(_brokers, listenerName)
+    alive = new Array[Boolean](_brokers.length)
     Arrays.fill(alive, true)
 
     // default implementation is a no-op, it is overridden by subclasses if required
@@ -117,20 +138,26 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-    if (servers != null) {
-      TestUtils.shutdownServers(servers)
-    }
+    TestUtils.shutdownServers(_brokers)
     super.tearDown()
   }
 
   /**
-   * Create a topic in ZooKeeper.
+   * Create a topic.
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
-                  topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] =
-    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+  def createTopic(topic: String,
+                  numPartitions: Int = 1,
+                  replicationFactor: Int = 1,
+                  topicConfig: Properties = new Properties,
+                  adminClientConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
+    if (isKRaftTest()) {
+      TestUtils.createTopicWithAdmin(topic, numPartitions, replicationFactor, brokers, topicConfig, adminClientConfig)
+    } else {
+      TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+    }
+  }
 
   /**
    * Create a topic in ZooKeeper using a customized replica assignment.
@@ -140,20 +167,28 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
 
+  def deleteTopic(topic: String): Unit = {
+    if (isKRaftTest()) {
+      TestUtils.deleteTopicWithAdmin(topic, brokers)
+    } else {
+      adminZkClient.deleteTopic(topic)
+    }
+  }
+
   /**
    * Pick a broker at random and kill it if it isn't already dead
    * Return the id of the broker killed
    */
   def killRandomBroker(): Int = {
-    val index = TestUtils.random.nextInt(servers.length)
+    val index = TestUtils.random.nextInt(_brokers.length)
     killBroker(index)
     index
   }
 
   def killBroker(index: Int): Unit = {
     if(alive(index)) {
-      servers(index).shutdown()
-      servers(index).awaitShutdown()
+      _brokers(index).shutdown()
+      _brokers(index).awaitShutdown()
       alive(index) = false
     }
   }
@@ -165,32 +200,50 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
     if (reconfigure) {
       instanceConfigs = null
     }
-    for(i <- servers.indices if !alive(i)) {
+    for(i <- _brokers.indices if !alive(i)) {
       if (reconfigure) {
-        servers(i) = TestUtils.createServer(
+        _brokers(i) = TestUtils.createServer(
           configs(i),
           time = brokerTime(configs(i).brokerId),
           threadNamePrefix = None,
           enableForwarding
         )
       }
-      servers(i).startup()
+      _brokers(i).startup()
       alive(i) = true
     }
   }
 
   def waitForUserScramCredentialToAppearOnAllBrokers(clientPrincipal: String, mechanismName: String): Unit = {
-    servers.foreach { server =>
+    _brokers.foreach { server =>
       val cache = server.credentialProvider.credentialCache.cache(mechanismName, classOf[ScramCredential])
       TestUtils.waitUntilTrue(() => cache.get(clientPrincipal) != null, s"SCRAM credentials not created for $clientPrincipal")
     }
   }
 
   def getController(): KafkaServer = {
+    checkIsZKTest()
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     servers.filter(s => s.config.brokerId == controllerId).head
   }
 
+  def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
+    val result = new util.HashMap[String, Uuid]()
+    if (isKRaftTest()) {
+      val topicIdsMap = controllerServer.controller.findTopicIds(Long.MaxValue, names.asJava).get()
+      names.foreach { name =>
+        val response = topicIdsMap.get(name)
+        result.put(name, response.result())
+      }
+    } else {
+      val topicIdsMap = getController().kafkaController.controllerContext.topicIds.toMap
+      names.foreach { name =>
+        if (topicIdsMap.contains(name)) result.put(name, topicIdsMap.get(name).get)
+      }
+    }
+    result.asScala.toMap
+  }
+
   def getTopicIds(): Map[String, Uuid] = {
     getController().kafkaController.controllerContext.topicIds.toMap
   }
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index cc3c69a..e045ea9 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -23,7 +23,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{Logging, TestUtils}
 
 import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaYammerMetrics
 
@@ -52,7 +52,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     // Do some Metrics Registry cleanup by removing the metrics that this test checks.
     // This is a test workaround to the issue that prior harness runs may have left a populated registry.
     // see https://issues.apache.org/jira/browse/KAFKA-4605
@@ -61,7 +61,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
         metricName.foreach(KafkaYammerMetrics.defaultRegistry.removeMetric)
     }
 
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index d2f9462..9db9e3f 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.util.Random
 import scala.jdk.CollectionConverters._
@@ -30,7 +30,7 @@ import java.util.concurrent.ExecutionException
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.network.ListenerName
@@ -41,7 +41,7 @@ import org.junit.jupiter.api.Assertions._
 
 import scala.annotation.nowarn
 
-class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
+class UncleanLeaderElectionTest extends QuorumTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
@@ -63,8 +63,8 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor])
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     configProps1 = createBrokerConfig(brokerId1, zkConnect)
     configProps2 = createBrokerConfig(brokerId2, zkConnect)
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 790536b..f4e69f9 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 
 import javax.management.ObjectName
 import com.yammer.metrics.core.MetricPredicate
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
@@ -34,7 +33,11 @@ import kafka.log.LogConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.JmxReporter
 import org.apache.kafka.common.utils.Time
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+@Timeout(120)
 class MetricsTest extends KafkaServerTestHarness with Logging {
   val numNodes = 2
   val numParts = 2
@@ -44,57 +47,64 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
   overridingProps.put(JmxReporter.EXCLUDE_CONFIG, s"$requiredKafkaServerPrefix=ClusterId")
 
-  def generateConfigs =
-    TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+  def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false).
+      map(KafkaConfig.fromProps(_, overridingProps))
 
   val nMessages = 2
 
-  @Test
-  def testMetricsReporterAfterDeletingTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMetricsReporterAfterDeletingTopic(quorum: String): Unit = {
     val topic = "test-topic-metric"
     createTopic(topic, 1, 1)
-    adminZkClient.deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    deleteTopic(topic)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
   }
 
-  @Test
-  def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(quorum: String): Unit = {
     val topic = "test-broker-topic-metric"
     createTopic(topic, 2, 1)
     // Produce a few messages to create the metrics
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
-    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
     assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
-    servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
-    adminZkClient.deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
+    deleteTopic(topic)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
   }
 
-  @Test
-  def testClusterIdMetric(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClusterIdMetric(quorum: String): Unit = {
     // Check if clusterId metric exists.
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=ClusterId"), 1)
   }
 
-  @Test
-  def testBrokerStateMetric(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBrokerStateMetric(quorum: String): Unit = {
     // Check if BrokerState metric exists.
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=BrokerState"), 1)
   }
 
-  @Test
-  def testYammerMetricsCountMetric(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testYammerMetricsCountMetric(quorum: String): Unit = {
     // Check if yammer-metrics-count metric exists.
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=yammer-metrics-count"), 1)
   }
 
-  @Test
-  def testLinuxIoMetrics(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLinuxIoMetrics(quorum: String): Unit = {
     // Check if linux-disk-{read,write}-bytes metrics either do or do not exist depending on whether we are or are not
     // able to collect those metrics on the platform where this test is running.
     val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying).usable()
@@ -104,8 +114,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
       assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=$name"), expectedCount))
   }
 
-  @Test
-  def testJMXFilter(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testJMXFilter(quorum: String): Unit = {
     // Check if cluster id metrics is not exposed in JMX
     assertTrue(ManagementFactory.getPlatformMBeanServer
                  .isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))
@@ -113,10 +124,11 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
                   .isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
   }
 
-  @Test
-  def testUpdateJMXFilter(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUpdateJMXFilter(quorum: String): Unit = {
     // verify previously exposed metrics are removed and existing matching metrics are added
-    servers.foreach(server => server.kafkaYammerMetrics.reconfigure(
+    brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure(
       Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
     ))
     assertFalse(ManagementFactory.getPlatformMBeanServer
@@ -125,22 +137,24 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
                   .isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
   }
 
-  @Test
-  def testGeneralBrokerTopicMetricsAreGreedilyRegistered(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGeneralBrokerTopicMetricsAreGreedilyRegistered(quorum: String): Unit = {
     val topic = "test-broker-topic-metric"
     createTopic(topic, 2, 1)
 
     // The broker metrics for all topics should be greedily registered
     assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist")
-    assertEquals(servers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
+    assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
     // topic metrics should be lazily registered
     assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered")
-    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
     assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics aren't registered")
   }
 
-  @Test
-  def testWindowsStyleTagNames(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWindowsStyleTagNames(quorum: String): Unit = {
     val path = "C:\\windows-path\\kafka-logs"
     val tags = Map("dir" -> path)
     val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
@@ -148,8 +162,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assert(metric.getMBeanName.endsWith(expectedMBeanName))
   }
 
-  @Test
-  def testBrokerTopicMetricsBytesInOut(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBrokerTopicMetricsBytesInOut(quorum: String): Unit = {
     val topic = "test-bytes-in-out"
     val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
     val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
@@ -160,20 +175,20 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
     createTopic(topic, 1, numNodes, topicConfig)
     // Produce a few messages to create the metrics
-    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
 
     // Check the log size for each broker so that we can distinguish between failures caused by replication issues
     // versus failures caused by the metrics
     val topicPartition = new TopicPartition(topic, 0)
-    servers.foreach { server =>
-      val log = server.logManager.getLog(new TopicPartition(topic, 0))
-      val brokerId = server.config.brokerId
+    brokers.foreach { broker =>
+      val log = broker.logManager.getLog(new TopicPartition(topic, 0))
+      val brokerId = broker.config.brokerId
       val logSize = log.map(_.size)
       assertTrue(logSize.exists(_ > 0), s"Expected broker $brokerId to have a Log for $topicPartition with positive size, actual: $logSize")
     }
 
     // Consume messages to make bytesOut tick
-    TestUtils.consumeTopicRecords(servers, topic, nMessages)
+    TestUtils.consumeTopicRecords(brokers, topic, nMessages)
     val initialReplicationBytesIn = TestUtils.meterCount(replicationBytesIn)
     val initialReplicationBytesOut = TestUtils.meterCount(replicationBytesOut)
     val initialBytesIn = TestUtils.meterCount(bytesIn)
@@ -183,20 +198,21 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertEquals(initialBytesOut, TestUtils.meterCount(bytesOut))
 
     // Produce a few messages to make the metrics tick
-    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
 
     assertTrue(TestUtils.meterCount(replicationBytesIn) > initialReplicationBytesIn)
     assertTrue(TestUtils.meterCount(replicationBytesOut) > initialReplicationBytesOut)
     assertTrue(TestUtils.meterCount(bytesIn) > initialBytesIn)
 
     // Consume messages to make bytesOut tick
-    TestUtils.consumeTopicRecords(servers, topic, nMessages)
+    TestUtils.consumeTopicRecords(brokers, topic, nMessages)
 
     assertTrue(TestUtils.meterCount(bytesOut) > initialBytesOut)
   }
 
-  @Test
-  def testControllerMetrics(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testZkControllerMetrics(quorum: String): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
 
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
@@ -216,13 +232,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
    * Test that the metrics are created with the right name, testZooKeeperStateChangeRateMetrics
    * and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the metrics behaviour.
    */
-  @Test
-  def testSessionExpireListenerMetrics(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSessionExpireListenerMetrics(quorum: String): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"), 1)
+    val expectedNumMetrics = if (isKRaftTest()) 0 else 1
+    assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+      count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"))
+    assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+      count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"))
+    assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+      count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"))
   }
 
   private def topicMetrics(topic: Option[String]): Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 74803ce..3bbce4d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,13 +19,14 @@ package kafka.security.auth
 
 import java.nio.charset.StandardCharsets
 import kafka.admin.ZkSecurityMigrator
+import kafka.server.QuorumTestHarness
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk._
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.util.{Failure, Success, Try}
 import javax.security.auth.login.Configuration
@@ -40,16 +41,16 @@ import org.apache.zookeeper.client.ZKClientConfig
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
+class ZkAuthorizationTest extends QuorumTestHarness with Logging {
   val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
     Configuration.setConfiguration(null)
     System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a49b0d3..9011eb6 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -25,9 +25,9 @@ import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 import kafka.Kafka
 import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
+import kafka.zk.ZkAclStore
 import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation._
@@ -44,12 +44,12 @@ import org.apache.kafka.server.authorizer._
 import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
 import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
-class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
+class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
 
   private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, READ, ALLOW)
   private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
@@ -70,8 +70,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
   override def authorizer: Authorizer = aclAuthorizer
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     // Increase maxUpdateRetries to avoid transient failures
     aclAuthorizer.maxUpdateRetries = Int.MaxValue
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
index 15ea42d..e38cafd 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
@@ -25,10 +25,10 @@ import javax.security.auth.Subject
 import javax.security.auth.callback.CallbackHandler
 import kafka.api.SaslSetup
 import kafka.security.authorizer.AclEntry.WildcardHost
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, QuorumTestHarness}
 import kafka.utils.JaasTestUtils.{JaasModule, JaasSection}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.KafkaZkClient
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
@@ -43,12 +43,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.zookeeper.server.auth.DigestLoginModule
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
+class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup {
 
   private val aclAuthorizer = new AclAuthorizer
   private val aclAuthorizer2 = new AclAuthorizer
@@ -60,7 +60,7 @@ class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
   private var config: KafkaConfig = _
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     // Allow failed clients to avoid server closing the connection before reporting AuthFailed.
     System.setProperty("zookeeper.allowSaslFailedClients", "true")
 
@@ -76,7 +76,7 @@ class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
     aclAuthorizer.maxUpdateRetries = Int.MaxValue
     aclAuthorizer2.maxUpdateRetries = Int.MaxValue
 
-    super.setUp()
+    super.setUp(testInfo)
     config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))
 
     aclAuthorizer.configure(config.originals)
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
index bccd58a..6852ab3 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -20,24 +20,24 @@ import java.util.concurrent.CompletionStage
 import java.{lang, util}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.authorizer._
 import org.apache.zookeeper.client.ZKClientConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
-class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
+class AuthorizerInterfaceDefaultTest extends QuorumTestHarness with BaseAuthorizerTest {
 
   private val interfaceDefaultAuthorizer = new DelegateAuthorizer
 
   override def authorizer: Authorizer = interfaceDefaultAuthorizer
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     // Increase maxUpdateRetries to avoid transient failures
     interfaceDefaultAuthorizer.authorizer.maxUpdateRetries = Int.MaxValue
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index 16de1df..523b6a7 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -24,9 +24,9 @@ import java.util.{Base64, Properties}
 import kafka.network.RequestChannel.Session
 import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
 import kafka.security.authorizer.AclEntry.WildcardHost
-import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
+import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclOperation}
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.ALLOW
@@ -41,12 +41,12 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
 import org.apache.kafka.server.authorizer._
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
 
-class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
+class DelegationTokenManagerTest extends QuorumTestHarness  {
 
   val time = new MockTime()
   val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
@@ -64,8 +64,8 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   var expiryTimeStamp: Long = 0
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     props = TestUtils.createBrokerConfig(0, zkConnect, enableToken = true)
     props.put(KafkaConfig.SaslEnabledMechanismsProp, ScramMechanism.mechanismNames().asScala.mkString(","))
     props.put(KafkaConfig.DelegationTokenSecretKeyProp, secretKey)
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index b4140d0..0a98d26 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -35,8 +35,8 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     createTopic(topic1, numPartitions, servers.size, new Properties())
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index db01370..3f74863 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -19,13 +19,13 @@ package kafka.server
 
 import org.junit.jupiter.api.Assertions._
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.jupiter.api.{AfterEach, Test}
 
 import scala.collection.mutable.ArrayBuffer
 
-class AdvertiseBrokerTest extends ZooKeeperTestHarness {
+class AdvertiseBrokerTest extends QuorumTestHarness {
   val servers = ArrayBuffer[KafkaServer]()
 
   val brokerId = 0
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 46be884..0e1b148 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
 import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.createTopic
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
@@ -36,19 +36,19 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
-class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
+class BrokerEpochIntegrationTest extends QuorumTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val configs = Seq(
       TestUtils.createBrokerConfig(brokerId1, zkConnect),
       TestUtils.createBrokerConfig(brokerId2, zkConnect))
diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 20e89ed..c05044e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -45,8 +45,7 @@ import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -106,8 +105,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     // Define a quota for ThrottledPrincipal
     defineUserQuota(ThrottledPrincipal.getName, Some(ControllerMutationRate))
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 2addda2..1d355ce 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -21,7 +21,7 @@ import java.util
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions.assertThrows
 
 import scala.concurrent.ExecutionException
@@ -32,8 +32,8 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
   override def brokerCount = 1
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
   }
 
   def createAdminConfig: util.Map[String, Object] = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index a6a09ee..4f304bd 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.InvalidPrincipalTypeException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.SecurityUtils
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import java.util
 import scala.concurrent.ExecutionException
@@ -40,9 +40,9 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   override def brokerCount = 1
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   override def generateConfigs = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index ba621f2..e1e5c30 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.jupiter.api.Assertions.assertThrows
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import java.util
 import scala.concurrent.ExecutionException
@@ -38,9 +38,9 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
   override def brokerCount = 1
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
+    super.setUp(testInfo)
   }
 
   def createAdminConfig: util.Map[String, Object] = {
diff --git a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 5b2a16e..222ff2d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.lang.{Byte => JByte}
 import java.util.Properties
+
 import kafka.network.SocketServer
 import kafka.security.authorizer.AclEntry
 import org.apache.kafka.common.message.{DescribeClusterRequestData, DescribeClusterResponseData}
@@ -27,7 +28,7 @@ import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeCluster
 import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -40,8 +41,8 @@ class DescribeClusterRequestTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 3bcb626..1fb9f33 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -18,13 +18,13 @@ package kafka.server
 
 import kafka.admin.AdminOperationException
 import kafka.utils.CoreUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.config._
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.junit.jupiter.api.Assertions.assertThrows
 import org.junit.jupiter.api.Test
 
-class DynamicConfigTest extends ZooKeeperTestHarness {
+class DynamicConfigTest extends QuorumTestHarness {
   private final val nonExistentConfig: String = "some.config.that.does.not.exist"
   private final val someValue: String = "some interesting value"
 
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index b21666e..7038678a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -35,8 +35,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
   override def brokerCount: Int = 1
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     initProducer()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
index f8166dd..db57ec9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import java.util.{Optional, Properties}
 import scala.jdk.CollectionConverters._
@@ -58,8 +58,8 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
index d7de098..d59474e 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
@@ -19,7 +19,8 @@ package kafka.server
 
 import java.util.concurrent.{CountDownLatch, TimeoutException}
 
-import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness}
+import kafka.server.QuorumTestHarness
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
@@ -29,7 +30,7 @@ import org.junit.jupiter.api.Test
 
 import scala.jdk.CollectionConverters._
 
-class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {
+class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
 
   private def createBrokerFeatures(): BrokerFeatures = {
     val supportedFeaturesMap = Map[String, SupportedVersionRange](
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index cd14d3c..26f6a12 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -20,11 +20,11 @@ import java.util.concurrent.atomic.AtomicReference
 
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener}
 import org.apache.kafka.test.MockMetricsReporter
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 object KafkaMetricReporterClusterIdTest {
@@ -75,13 +75,13 @@ object KafkaMetricReporterClusterIdTest {
   }
 }
 
-class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
+class KafkaMetricReporterClusterIdTest extends QuorumTestHarness {
   var server: KafkaServer = null
   var config: KafkaConfig = null
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val props = TestUtils.createBrokerConfig(1, zkConnect)
     props.setProperty(KafkaConfig.KafkaMetricsReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index f480bc5..c84a91b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.metrics.KafkaMetric
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.kafka.common.message.ListGroupsRequestData
@@ -44,8 +44,8 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     // need a quota prop to register a "throttle-time" metrics after server startup
     val quotaProps = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 8a3d7cf..7e5d791 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -21,10 +21,10 @@ import java.util
 import java.util.concurrent.atomic.AtomicReference
 
 import kafka.utils.{CoreUtils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
 import org.junit.jupiter.api.Assertions.{assertEquals}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 
 
@@ -59,13 +59,13 @@ object KafkaMetricsReporterTest {
   }
 }
 
-class KafkaMetricsReporterTest extends ZooKeeperTestHarness {
+class KafkaMetricsReporterTest extends QuorumTestHarness {
   var server: KafkaServer = null
   var config: KafkaConfig = null
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val props = TestUtils.createBrokerConfig(1, zkConnect)
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
     props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 8056e23..6ab6930 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -19,13 +19,13 @@ package kafka.server
 
 import kafka.api.ApiVersion
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail}
 import org.junit.jupiter.api.Test
 
 import java.util.Properties
 
-class KafkaServerTest extends ZooKeeperTestHarness {
+class KafkaServerTest extends QuorumTestHarness {
 
   @Test
   def testAlreadyRegisteredAdvertisedListeners(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index e5e27b5..a1fb7cd 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -29,16 +29,16 @@ import kafka.utils.TestUtils
 import kafka.cluster.Broker
 import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
-class LeaderElectionTest extends ZooKeeperTestHarness {
+class LeaderElectionTest extends QuorumTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
@@ -47,8 +47,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
   var staleControllerEpochDetected = false
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false)
     val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false)
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6d6d881..1025d7a 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -50,8 +50,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     createTopic(topic, partitionNum, brokerCount)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 93c7094..30fc72d 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -22,17 +22,17 @@ import scala.collection.Seq
 
 import kafka.utils.TestUtils
 import TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import java.io.File
 
 import kafka.server.checkpoints.OffsetCheckpointFile
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 
-class LogRecoveryTest extends ZooKeeperTestHarness {
+class LogRecoveryTest extends QuorumTestHarness {
 
   val replicaLagTimeMaxMs = 5000L
   val replicaLagMaxMessages = 10L
@@ -75,8 +75,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
 
     configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
 
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 42eebb3..d91d58e 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.test.TestUtils.isValidClusterId
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -36,8 +36,8 @@ import scala.jdk.CollectionConverters._
 class MetadataRequestTest extends AbstractMetadataRequestTest {
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
index 8409c96..3580e2b 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.MetadataRequest
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -30,8 +30,8 @@ import scala.jdk.CollectionConverters._
 class MetadataRequestWithForwardingTest extends AbstractMetadataRequestTest {
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
   }
 
   override def enableForwarding: Boolean = true
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 5e821fb..477f3eb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -25,10 +25,10 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
-
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 import java.util
 import java.util.Collections.singletonList
+
 import scala.jdk.CollectionConverters._
 import java.util.{Optional, Properties}
 
@@ -74,8 +74,8 @@ class OffsetFetchRequestTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
 
     TestUtils.createOffsetsTopic(zkClient, servers)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 963c9bf..4a5ab60 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -19,22 +19,22 @@ package kafka.server
 
 import scala.collection.Seq
 
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import kafka.zk.ZooKeeperTestHarness
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import kafka.server.QuorumTestHarness
 import kafka.utils.TestUtils
 import TestUtils._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.StringSerializer
 
-class ReplicaFetchTest extends ZooKeeperTestHarness  {
+class ReplicaFetchTest extends QuorumTestHarness  {
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val props = createBrokerConfigs(2, zkConnect)
     brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index cbd882d..4b7cae8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -25,7 +25,7 @@ import kafka.server.QuotaType._
 import kafka.utils.TestUtils._
 import kafka.utils.CoreUtils._
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.junit.jupiter.api.Assertions._
@@ -42,7 +42,7 @@ import scala.jdk.CollectionConverters._
   *
   * Anything over 100MB/s tends to fail as this is the non-throttled replication rate
   */
-class ReplicationQuotasTest extends ZooKeeperTestHarness {
+class ReplicationQuotasTest extends QuorumTestHarness {
   def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100.0)
 
   val msg100KB = new Array[Byte](100000)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e0bb504..56359df 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -50,7 +50,7 @@ import org.apache.kafka.common._
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
@@ -85,9 +85,9 @@ class RequestQuotaTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
-    super.setUp()
+    super.setUp(testInfo)
 
     createTopic(topic, numPartitions)
     leaderNode = servers.head
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index d6b8103..096debe 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -20,15 +20,15 @@ import java.util.Properties
 
 import scala.collection.Seq
 
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import kafka.utils.TestUtils
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import java.io.File
 
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
-class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
+class ServerGenerateBrokerIdTest extends QuorumTestHarness {
   var props1: Properties = null
   var config1: KafkaConfig = null
   var props2: Properties = null
@@ -37,8 +37,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   var servers: Seq[KafkaServer] = Seq()
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     props1 = TestUtils.createBrokerConfig(-1, zkConnect)
     config1 = KafkaConfig.fromProps(props1)
     props2 = TestUtils.createBrokerConfig(0, zkConnect)
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 0e9c744..fd9b365 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -26,14 +26,14 @@ import ExecutionContext.Implicits._
 
 import kafka.common.{InconsistentBrokerMetadataException, InconsistentClusterIdException}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 
-class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
+class ServerGenerateClusterIdTest extends QuorumTestHarness {
   var config1: KafkaConfig = null
   var config2: KafkaConfig = null
   var config3: KafkaConfig = null
@@ -41,8 +41,8 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
   val brokerMetaPropsFile = "meta.properties"
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     config1 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect))
     config2 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(2, zkConnect))
     config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect))
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index a523bd7..013d084 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 
@@ -40,14 +40,14 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 @Timeout(60)
-class ServerShutdownTest extends ZooKeeperTestHarness {
+class ServerShutdownTest extends QuorumTestHarness {
   var config: KafkaConfig = null
   val host = "localhost"
   val topic = "test"
@@ -55,8 +55,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
   val sent2 = List("more", "messages")
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     val props = TestUtils.createBrokerConfig(0, zkConnect)
     config = KafkaConfig.fromProps(props)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index bf85e99..e80084d 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -18,14 +18,14 @@
 package kafka.server
 
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.metadata.BrokerState
 import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 
-class ServerStartupTest extends ZooKeeperTestHarness {
+class ServerStartupTest extends QuorumTestHarness {
 
   private var server: KafkaServer = null
 
diff --git a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
index 5c38026..224474e 100644
--- a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -43,8 +43,8 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
   }
 
   @BeforeEach
-  override def setUp(): Unit = {
-    doSetup(createOffsetsTopic = false)
+  override def setUp(testInfo: TestInfo): Unit = {
+    doSetup(testInfo, createOffsetsTopic = false)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 33776cf..72d2866 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -26,14 +26,14 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.tools.DumpLogSegments
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.{ListBuffer => Buffer}
@@ -47,7 +47,7 @@ import scala.collection.Seq
   *
   * A test which validates the end to end workflow is also included.
   */
-class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness with Logging {
+class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
 
   // Set this to KAFKA_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
   val apiVersion = ApiVersion.latestVersion
@@ -59,8 +59,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 206cee6..2557d35 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -22,7 +22,7 @@ import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors._
@@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection.Map
 import scala.collection.mutable.ListBuffer
 
-class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
+class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
   var brokers: ListBuffer[KafkaServer] = ListBuffer()
   val topic1 = "foo"
   val topic2 = "bar"
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index c62224c..a610956 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -19,12 +19,13 @@ package kafka.utils
 
 import kafka.api.LeaderAndIsr
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.server.QuorumTestHarness
 import kafka.zk._
 import org.apache.kafka.common.TopicPartition
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
-class ReplicationUtilsTest extends ZooKeeperTestHarness {
+class ReplicationUtilsTest extends QuorumTestHarness {
   private val zkVersion = 1
   private val topic = "my-topic-test"
   private val partition = 0
@@ -34,8 +35,8 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   private val isr = List(1, 2)
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     zkClient.makeSurePersistentPathExists(TopicZNode.path(topic))
     val topicPartition = new TopicPartition(topic, partition)
     val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 378c0f5..def86cb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -67,6 +67,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
+import org.apache.kafka.controller.QuorumController
 import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -2144,7 +2145,8 @@ object TestUtils extends Logging {
       KafkaProducer.NETWORK_THREAD_PREFIX,
       AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
       AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
-      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+      QuorumTestHarness.ZkClientEventThreadSuffix,
+      QuorumController.CONTROLLER_THREAD_SUFFIX
     )
 
     def unexpectedThreads: Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 253188c..5e9817b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -23,11 +23,11 @@ import kafka.controller.ReplicaAssignment
 import kafka.log._
 import kafka.server.DynamicConfig.Broker._
 import kafka.server.KafkaConfig._
-import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer, QuorumTestHarness}
 import kafka.utils.CoreUtils._
 import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
-import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq, immutable}
 
-class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+class AdminZkClientTest extends QuorumTestHarness with Logging with RackAwareTest {
 
   var servers: Seq[KafkaServer] = Seq()
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 2088e5f..6be954d 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
-import kafka.server.{ConfigType, KafkaConfig}
+import kafka.server.{ConfigType, KafkaConfig, QuorumTestHarness}
 import kafka.utils.CoreUtils
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.network.ListenerName
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -56,7 +56,7 @@ import org.apache.zookeeper.data.Stat
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-class KafkaZkClientTest extends ZooKeeperTestHarness {
+class KafkaZkClientTest extends QuorumTestHarness {
 
   private val group = "my-group"
   private val topic1 = "topic1"
@@ -73,8 +73,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   var expiredSessionZkClient: ExpiredKafkaZkClient = _
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
     zkClient.createControllerEpochRaw(1)
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
deleted file mode 100755
index a81cbb9..0000000
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.zk
-
-import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
-
-@Tag("integration")
-abstract class ZooKeeperTestHarness extends Logging {
-
-  val zkConnectionTimeout = 10000
-  val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
-  val zkMaxInFlightRequests = Int.MaxValue
-
-  protected def zkAclsEnabled: Option[Boolean] = None
-
-  var zkClient: KafkaZkClient = null
-  var adminZkClient: AdminZkClient = null
-
-  var zookeeper: EmbeddedZookeeper = null
-
-  def zkPort: Int = zookeeper.port
-  def zkConnect: String = s"127.0.0.1:$zkPort"
-  
-  @BeforeEach
-  def setUp(): Unit = {
-    zookeeper = new EmbeddedZookeeper()
-    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig)
-    adminZkClient = new AdminZkClient(zkClient)
-  }
-
-  @AfterEach
-  def tearDown(): Unit = {
-    shutdownZooKeeper()
-    Configuration.setConfiguration(null)
-  }
-
-  def shutdownZooKeeper(): Unit = {
-    if (zkClient != null)
-      zkClient.close()
-    if (zookeeper != null)
-      CoreUtils.swallow(zookeeper.shutdown(), this)
-  }
-
-  // Trigger session expiry by reusing the session id in another client
-  def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
-    val dummyWatcher = new Watcher {
-      override def process(event: WatchedEvent): Unit = {}
-    }
-    val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
-      zooKeeper.getSessionId,
-      zooKeeper.getSessionPasswd)
-    assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
-    anotherZkClient
-  }
-}
-
-object ZooKeeperTestHarness {
-  val ZkClientEventThreadSuffix = "-EventThread"
-
-  /**
-   * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
-   * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
-   * which is true for core tests where this harness is used.
-   */
-  @BeforeAll
-  def setUpClass(): Unit = {
-    TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
-  }
-
-  /**
-   * Verify that tests from the current test class using ZooKeeperTestHarness haven't left behind an unexpected thread
-   */
-  @AfterAll
-  def tearDownClass(): Unit = {
-    TestUtils.verifyNoUnexpectedThreads("@AfterAll")
-  }
-
-}
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 5cfb9ee..649f3c5 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.{Gauge, Meter, MetricName}
 import kafka.server.KafkaConfig
 import kafka.metrics.KafkaYammerMetrics
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
@@ -35,21 +35,21 @@ import org.apache.zookeeper.ZooKeeper.States
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
 import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
-class ZooKeeperClientTest extends ZooKeeperTestHarness {
+class ZooKeeperClientTest extends QuorumTestHarness {
   private val mockPath = "/foo"
   private val time = Time.SYSTEM
 
   private var zooKeeperClient: ZooKeeperClient = _
 
   @BeforeEach
-  override def setUp(): Unit = {
+  override def setUp(testInfo: TestInfo): Unit = {
     TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
     cleanMetricsRegistry()
-    super.setUp()
+    super.setUp(testInfo)
     zooKeeperClient = newZooKeeperClient()
   }
 
@@ -88,9 +88,9 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testConnection(): Unit = {
     val client = newZooKeeperClient()
     try {
-      // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
+      // Verify ZooKeeper event thread name. This is used in QuorumTestHarness to verify that tests have closed ZK clients
       val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
-      assertTrue(threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix)),
+      assertTrue(threads.exists(_.contains(QuorumTestHarness.ZkClientEventThreadSuffix)),
         s"ZooKeeperClient event thread not found, threads=$threads")
     } finally {
       client.close()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 67d4eb0..1a1c5d0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -232,7 +232,7 @@ public final class QuorumController implements Controller {
             }
             KafkaEventQueue queue = null;
             try {
-                queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+                queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(logContext, nodeId, queue, time, configDefs,
                     raftClient, supportedFeatures, defaultReplicationFactor,
                     defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
@@ -245,6 +245,8 @@ public final class QuorumController implements Controller {
         }
     }
 
+    public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
+
     private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
         "The active controller appears to be node ";
 
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index 31dd43f..e1dd217 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import kafka.api.IntegrationTestHarness;
+import kafka.utils.EmptyTestInfo;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
@@ -60,7 +61,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
     public void initialize(Set<TopicIdPartition> topicIdPartitions,
                            boolean startConsumerThread) {
         // Call setup to start the cluster.
-        super.setUp();
+        super.setUp(new EmptyTestInfo());
 
         initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread);
     }