You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/18 16:39:48 UTC

[kafka] branch trunk updated: MINOR: Enable some AdminClient integration tests (#12110)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67d00e25e9 MINOR: Enable some AdminClient integration tests (#12110)
67d00e25e9 is described below

commit 67d00e25e941f73be8b959c6732ac4db1d1083bf
Author: dengziming <de...@gmail.com>
AuthorDate: Thu May 19 00:39:26 2022 +0800

    MINOR: Enable some AdminClient integration tests (#12110)
    
    Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and `PlaintextAdminIntegrationTest`. There are some tests not enabled or not as expected yet:
    
    - testNullConfigs, see KAFKA-13863
    - testDescribeCluster and testMetadataRefresh, currently we don't get the real controller in KRaft mode so the test may not run as expected
    
    This patch also changes the exception type raised from invalid `IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. When the configuration value type is not a list, we now raise `INVALID_CONFIG` instead of `INVALID_REQUEST`.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/ConfigAdminManager.scala    |   4 +-
 .../AdminClientWithPoliciesIntegrationTest.scala   |  16 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 558 ++++++++++++---------
 3 files changed, 324 insertions(+), 254 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index e7d6c33ab2..cc7a98179d 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -494,7 +494,7 @@ object ConfigAdminManager {
         case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name)
         case OpType.APPEND => {
           if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
+            throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
             .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
             .getOrElse("")
@@ -505,7 +505,7 @@ object ConfigAdminManager {
         }
         case OpType.SUBTRACT => {
           if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
+            throw new InvalidConfigurationException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
             .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
             .getOrElse("")
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index fb1b0d248d..c9d40cadb0 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -15,18 +15,18 @@ package kafka.api
 
 import java.util
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     ).asJava)
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
 
     // Verify that the second resource was updated and the others were not
     ensureConsistentKRaftMetadata()
@@ -172,10 +172,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     ensureConsistentKRaftMetadata()
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 543b3b80cd..46cf3c9c4f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 import org.slf4j.LoggerFactory
@@ -87,15 +87,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     super.tearDown()
   }
 
-  @Test
-  def testClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClose(quorum: String): Unit = {
     val client = Admin.create(createConfig)
     client.close()
     client.close() // double close has no effect
   }
 
-  @Test
-  def testListNodes(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListNodes(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val brokerStrs = bootstrapServers().split(",").toList.sorted
     var nodeStrs: List[String] = null
@@ -106,8 +108,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
   }
 
-  @Test
-  def testAdminClientHandlingBadIPWithoutTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000")
     val returnBadAddressFirst = new HostResolver {
@@ -120,8 +123,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     client.describeCluster().nodes().get()
   }
 
-  @Test
-  def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "mytopic"
     val topics = Seq(topic)
@@ -130,14 +134,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     client.createTopics(newTopics.asJava).all.get()
     waitForTopics(client, topics, List())
 
-    val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort))
+    val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (brokers.size + 1).toShort))
     val e = assertThrows(classOf[ExecutionException],
       () => client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get())
     assertTrue(e.getCause.isInstanceOf[TopicExistsException])
   }
 
-  @Test
-  def testDeleteTopicsWithIds(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicsWithIds(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic", "mytopic2", "mytopic3")
     val newTopics = Seq(
@@ -154,15 +159,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     waitForTopics(client, List(), topics)
   }
 
-  @Test
-  def testMetadataRefresh(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // KRaft mode will be supported in KAFKA-13910
+  def testMetadataRefresh(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic")
     val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
     client.createTopics(newTopics.asJava).all.get()
     waitForTopics(client, expectedPresent = topics, expectedMissing = List())
 
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controller = brokers.find(_.config.brokerId == brokers.flatMap(_.metadataCache.getControllerId).head).get
     controller.shutdown()
     controller.awaitShutdown()
     val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
@@ -172,8 +178,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   /**
     * describe should not auto create topics
     */
-  @Test
-  def testDescribeNonExistingTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeNonExistingTopic(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val existingTopic = "existing-topic"
@@ -183,18 +190,23 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val nonExistingTopic = "non-existing"
     val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues()
     assertEquals(existingTopic, results.get(existingTopic).get.name)
-    assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
-    assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+    assertFutureExceptionTypeEquals(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException])
+    if (!isKRaftTest()) {
+      assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+    }
   }
 
-  @Test
-  def testDescribeTopicsWithIds(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTopicsWithIds(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val existingTopic = "existing-topic"
     client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get()
     waitForTopics(client, Seq(existingTopic), List())
-    val existingTopicId = zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
+    ensureConsistentKRaftMetadata()
+
+    val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic)
 
     val nonExistingTopicId = Uuid.randomUuid()
 
@@ -203,37 +215,48 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
   }
 
-  @Test
-  def testDescribeCluster(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeCluster(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val result = client.describeCluster
     val nodes = result.nodes.get()
     val clusterId = result.clusterId().get()
-    assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
+    assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId)
     val controller = result.controller().get()
-    assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
-      getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
-    val brokers = bootstrapServers().split(",")
-    assertEquals(brokers.size, nodes.size)
+
+    if (isKRaftTest()) {
+      // In KRaft, we return a random brokerId as the current controller.
+      val brokerIds = brokers.map(_.config.brokerId).toSet
+      assertTrue(brokerIds.contains(controller.id))
+    } else {
+      assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
+        getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
+    }
+
+    val brokerEndpoints = bootstrapServers().split(",")
+    assertEquals(brokerEndpoints.size, nodes.size)
     for (node <- nodes.asScala) {
       val hostStr = s"${node.host}:${node.port}"
-      assertTrue(brokers.contains(hostStr), s"Unknown host:port pair $hostStr in brokerVersionInfos")
+      assertTrue(brokerEndpoints.contains(hostStr), s"Unknown host:port pair $hostStr in brokerVersionInfos")
     }
   }
 
-  @Test
-  def testDescribeLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val leaderByPartition = createTopic(topic, numPartitions = 10)
     val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) => leaderId }.map { case (k, v) =>
       k -> v.keys.toSeq
     }
-    val brokers = (0 until brokerCount).map(Integer.valueOf)
-    val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).allDescriptions.get
+    ensureConsistentKRaftMetadata()
+    val brokerIds = (0 until brokerCount).map(Integer.valueOf)
+    val logDirInfosByBroker = client.describeLogDirs(brokerIds.asJava).allDescriptions.get
 
     (0 until brokerCount).foreach { brokerId =>
-      val server = servers.find(_.config.brokerId == brokerId).get
+      val server = brokers.find(_.config.brokerId == brokerId).get
       val expectedPartitions = partitionsByBroker(brokerId)
       val logDirInfos = logDirInfosByBroker.get(brokerId)
       val replicaInfos = logDirInfos.asScala.flatMap { case (_, logDirInfo) =>
@@ -249,36 +272,39 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testDescribeReplicaLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeReplicaLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val leaderByPartition = createTopic(topic, numPartitions = 10)
     val replicas = leaderByPartition.map { case (partition, brokerId) =>
       new TopicPartitionReplica(topic, partition, brokerId)
     }.toSeq
+    ensureConsistentKRaftMetadata()
 
     val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
     replicaDirInfos.forEach { (topicPartitionReplica, replicaDirInfo) =>
-      val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
+      val server = brokers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
       val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
       assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir)
     }
   }
 
-  @Test
-  def testAlterReplicaLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterReplicaLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val tp = new TopicPartition(topic, 0)
-    val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
+    val randomNums = brokers.map(server => server -> Random.nextInt(2)).toMap
 
     // Generate two mutually exclusive replicaAssignment
-    val firstReplicaAssignment = servers.map { server =>
+    val firstReplicaAssignment = brokers.map { server =>
       val logDir = new File(server.config.logDirs(randomNums(server))).getAbsolutePath
       new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
     }.toMap
-    val secondReplicaAssignment = servers.map { server =>
+    val secondReplicaAssignment = brokers.map { server =>
       val logDir = new File(server.config.logDirs(1 - randomNums(server))).getAbsolutePath
       new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
     }.toMap
@@ -292,14 +318,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
 
     createTopic(topic, replicationFactor = brokerCount)
-    servers.foreach { server =>
+    ensureConsistentKRaftMetadata()
+    brokers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
     }
 
     // Verify that replica can be moved to the specified log directory after the topic has been created
     client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
-    servers.foreach { server =>
+    brokers.foreach { server =>
       TestUtils.waitUntilTrue(() => {
         val logDir = server.logManager.getLog(tp).get.dir.getParent
         secondReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
@@ -332,7 +359,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     try {
       TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages messages are produced before timeout. Producer future ${producerFuture.value}")
       client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
-      servers.foreach { server =>
+      brokers.foreach { server =>
         TestUtils.waitUntilTrue(() => {
           val logDir = server.logManager.getLog(tp).get.dir.getParent
           firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
@@ -347,7 +374,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
 
     // Verify that all messages that are produced can be consumed
-    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages,
+    val consumerRecords = TestUtils.consumeTopicRecords(brokers, topic, finalNumMessages,
       securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
     consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
       assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value))
@@ -697,8 +724,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testSeekAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSeekAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -726,8 +754,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(10L, consumer.position(topicPartition))
   }
 
-  @Test
-  def testLogStartOffsetCheckpoint(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogStartOffsetCheckpoint(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -765,8 +794,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
   }
 
-  @Test
-  def testLogStartOffsetAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -782,25 +812,26 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(3L, lowWatermark)
 
     for (i <- 0 until brokerCount)
-      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
   }
 
-  @Test
-  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
     val leaders = createTopic(topic, replicationFactor = brokerCount)
-    val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+    val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
-      TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.localLog(topicPartition) != None,
+      TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
                               "Expected follower to create replica for partition")
 
       // wait until the follower discovers that log start offset moved beyond its HW
       TestUtils.waitUntilTrue(() => {
-        servers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset == expectedStartOffset
+        brokers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset == expectedStartOffset
       }, s"Expected follower to discover new log start offset $expectedStartOffset")
 
       TestUtils.waitUntilTrue(() => {
-        servers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset == expectedEndOffset
+        brokers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset == expectedEndOffset
       }, s"Expected follower to catch up to log end offset $expectedEndOffset")
     }
 
@@ -821,7 +852,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
     // after the new replica caught up, all replicas should have same log start offset
     for (i <- 0 until brokerCount)
-      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
 
     // kill the same follower again, produce more records, and delete records beyond follower's LOE
     killBroker(followerIndex)
@@ -832,8 +863,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
   }
 
-  @Test
-  def testAlterLogDirsAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = {
     client = Admin.create(createConfig)
     createTopic(topic, replicationFactor = brokerCount)
     val expectedLEO = 100
@@ -845,27 +877,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     result.all().get()
     // make sure we are in the expected state after delete records
     for (i <- 0 until brokerCount) {
-      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
-      assertEquals(expectedLEO, servers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
+      assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, brokers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
     }
 
     // we will create another dir just for one server
-    val futureLogDir = servers(0).config.logDirs(1)
-    val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId)
+    val futureLogDir = brokers(0).config.logDirs(1)
+    val futureReplica = new TopicPartitionReplica(topic, 0, brokers(0).config.brokerId)
 
     // Verify that replica can be moved to the specified log directory
     client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get
     TestUtils.waitUntilTrue(() => {
-      futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent
+      futureLogDir == brokers(0).logManager.getLog(topicPartition).get.dir.getParent
     }, "timed out waiting for replica movement")
 
     // once replica moved, its LSO and LEO should match other replicas
-    assertEquals(3, servers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
-    assertEquals(expectedLEO, servers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
+    assertEquals(3, brokers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, brokers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
   }
 
-  @Test
-  def testOffsetsForTimesAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -886,8 +919,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition))
   }
 
-  @Test
-  def testConsumeAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumeAfterDeleteRecords(quorum: String): Unit = {
     val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
@@ -909,8 +943,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     TestUtils.consumeRecords(consumer, 2)
   }
 
-  @Test
-  def testDeleteRecordsWithException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteRecordsWithException(quorum: String): Unit = {
     val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
@@ -934,8 +969,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
   }
 
-  @Test
-  def testDescribeConfigsForTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigsForTopic(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
     client = Admin.create(createConfig)
 
@@ -982,8 +1018,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
    * Also see [[kafka.api.SaslSslAdminIntegrationTest.testAclOperations()]] for tests of ACL operations
    * when the authorizer is enabled.
    */
-  @Test
-  def testAclOperations(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperations(quorum: String): Unit = {
     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
     client = Admin.create(createConfig)
@@ -998,8 +1035,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     * Test closing the AdminClient with a generous timeout.  Calls in progress should be completed,
     * since they can be done within the timeout.  New calls should receive timeouts.
     */
-  @Test
-  def testDelayedClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDelayedClose(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic", "mytopic2")
     val newTopics = topics.map(new NewTopic(_, 1, 1.toShort))
@@ -1015,8 +1053,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
     * timeouts in progress.  The calls should be aborted after the hard shutdown timeout elapses.
     */
-  @Test
-  def testForceClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testForceClose(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     client = Admin.create(config)
@@ -1032,8 +1071,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     * Check that a call with a timeout does not complete before the minimum timeout has elapsed,
     * even when the default request timeout is shorter.
     */
-  @Test
-  def testMinimumRequestTimeouts(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMinimumRequestTimeouts(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
@@ -1049,8 +1089,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   /**
     * Test injecting timeouts for calls that are in flight.
     */
-  @Test
-  def testCallInFlightTimeouts(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCallInFlightTimeouts(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
     config.put(AdminClientConfig.RETRIES_CONFIG, "0")
@@ -1068,8 +1109,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   /**
    * Test the consumer group APIs.
    */
-  @Test
-  def testConsumerGroups(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumerGroups(quorum: String): Unit = {
     val config = createConfig
     client = Admin.create(config)
     try {
@@ -1287,8 +1329,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testDeleteConsumerGroupOffsets(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteConsumerGroupOffsets(quorum: String): Unit = {
     val config = createConfig
     client = Admin.create(config)
     try {
@@ -1359,8 +1402,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testElectPreferredLeaders(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectPreferredLeaders(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val prefer0 = Seq(0, 1, 2)
@@ -1368,10 +1412,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val prefer2 = Seq(2, 0, 1)
 
     val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
-    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers)
+    createTopicWithAssignment(partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0))
 
     val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
-    TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers)
+    createTopicWithAssignment(partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0))
 
     def preferredLeader(topicPartition: TopicPartition): Int = {
       val partitionMetadata = getTopicMetadata(client, topicPartition.topic).partitions.get(topicPartition.partition)
@@ -1380,19 +1424,18 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
 
     /** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */
-    def changePreferredLeader(newAssignment: Seq[Int]) = {
+    def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
       val preferred = newAssignment.head
-      val prior1 = zkClient.getLeaderForPartition(partition1).get
-      val prior2 = zkClient.getLeaderForPartition(partition2).get
-
-      var m = Map.empty[TopicPartition, Seq[Int]]
+      val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id()
+      val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()
 
+      var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
       if (prior1 != preferred)
-        m += partition1 -> newAssignment
+        m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
       if (prior2 != preferred)
-        m += partition2 -> newAssignment
+        m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
+      client.alterPartitionReassignments(m.asJava).all().get()
 
-      zkClient.createPartitionReassignment(m)
       TestUtils.waitUntilTrue(
         () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
         s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
@@ -1408,7 +1451,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
-    var exception = electResult.partitions.get.get(partition1).get
+    val exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
     TestUtils.assertLeader(client, partition1, 0)
 
@@ -1437,13 +1480,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
     TestUtils.assertLeader(client, partition2, 1)
 
+    def assertUnknownTopicOrPartition(
+      topicPartition: TopicPartition,
+      result: ElectLeadersResult
+    ): Unit = {
+      val exception = result.partitions.get.get(topicPartition).get
+      assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
+      if (isKRaftTest()) {
+        assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage)
+      } else {
+        assertEquals("The partition does not exist.", exception.getMessage)
+      }
+    }
+
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition).asJava)
     assertEquals(Set(unknownPartition).asJava, electResult.partitions.get.keySet)
-    exception = electResult.partitions.get.get(unknownPartition).get
-    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
-    assertEquals("The partition does not exist.", exception.getMessage)
+    assertUnknownTopicOrPartition(unknownPartition, electResult)
     TestUtils.assertLeader(client, partition1, 1)
     TestUtils.assertLeader(client, partition2, 1)
 
@@ -1455,9 +1509,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
     TestUtils.assertLeader(client, partition1, 2)
     TestUtils.assertLeader(client, partition2, 1)
-    exception = electResult.partitions.get.get(unknownPartition).get
-    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
-    assertEquals("The partition does not exist.", exception.getMessage)
+    assertUnknownTopicOrPartition(unknownPartition, electResult)
 
     // elect preferred leader for partition 2
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
@@ -1468,41 +1520,48 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
-    servers(1).shutdown()
+    brokers(1).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
 
+    def assertPreferredLeaderNotAvailable(
+      topicPartition: TopicPartition,
+      result: ElectLeadersResult
+    ): Unit = {
+      val exception = result.partitions.get.get(topicPartition).get
+      assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
+      if (isKRaftTest()) {
+        assertTrue(exception.getMessage.contains(
+          "The preferred leader was not available."),
+          s"Unexpected message: ${exception.getMessage}")
+      } else {
+        assertTrue(exception.getMessage.contains(
+          s"Failed to elect leader for partition $topicPartition under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
+          s"Unexpected message: ${exception.getMessage}")
+      }
+    }
+
     // ... now what happens if we try to elect the preferred leader and it's down?
     val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava, shortTimeout)
     assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
-    exception = electResult.partitions.get.get(partition1).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
+
+    assertPreferredLeaderNotAvailable(partition1, electResult)
     TestUtils.assertLeader(client, partition1, 2)
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
+    assertTrue(Set(partition1, partition2).subsetOf(electResult.partitions.get.keySet.asScala))
 
-    exception = electResult.partitions.get.get(partition1).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
-
-    exception = electResult.partitions.get.get(partition2).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
-
+    assertPreferredLeaderNotAvailable(partition1, electResult)
     TestUtils.assertLeader(client, partition1, 2)
+
+    assertPreferredLeaderNotAvailable(partition2, electResult)
     TestUtils.assertLeader(client, partition2, 2)
   }
 
-  @Test
-  def testElectUncleanLeadersForOnePartition(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForOnePartition(quorum: String): Unit = {
     // Case: unclean leader election with one topic partition
     client = Admin.create(createConfig)
 
@@ -1511,23 +1570,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val assignment1 = Seq(broker1, broker2)
 
     val partition1 = new TopicPartition("unclean-test-topic-1", 0)
-    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1), servers)
+    createTopicWithAssignment(partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1))
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     TestUtils.assertLeader(client, partition1, broker2)
   }
 
-  @Test
-  def testElectUncleanLeadersForManyPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = {
     // Case: unclean leader election with many topic partitions
     client = Admin.create(createConfig)
 
@@ -1540,22 +1600,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertNoLeader(client, partition2)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1564,8 +1622,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker2)
   }
 
-  @Test
-  def testElectUncleanLeadersForAllPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = {
     // Case: noop unclean leader election and valid unclean leader election for all partitions
     client = Admin.create(createConfig)
 
@@ -1579,22 +1638,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1603,8 +1660,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker3)
   }
 
-  @Test
-  def testElectUncleanLeadersForUnknownPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = {
     // Case: unclean leader election for unknown topic
     client = Admin.create(createConfig)
 
@@ -1616,11 +1674,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val unknownPartition = new TopicPartition(topic, 1)
     val unknownTopic = new TopicPartition("unknown-topic", 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(0 -> assignment1),
-      servers
+      Map(0 -> assignment1)
     )
 
     TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
@@ -1630,8 +1686,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException])
   }
 
-  @Test
-  def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = {
     // Case: unclean leader election with no live brokers
     client = Admin.create(createConfig)
 
@@ -1642,26 +1699,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val topic = "unclean-test-topic-1"
     val partition1 = new TopicPartition(topic, 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1),
-      servers
+      Map(partition1.partition -> assignment1)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
   }
 
-  @Test
-  def testElectUncleanLeadersNoop(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersNoop(quorum: String): Unit = {
     // Case: noop unclean leader election with explicit topic partitions
     client = Admin.create(createConfig)
 
@@ -1672,25 +1728,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val topic = "unclean-test-topic-1"
     val partition1 = new TopicPartition(topic, 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1),
-      servers
+      Map(partition1.partition -> assignment1)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertLeader(client, partition1, broker2)
-    servers(broker1).startup()
+    brokers(broker1).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException])
   }
 
-  @Test
-  def testElectUncleanLeadersAndNoop(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersAndNoop(quorum: String): Unit = {
     // Case: one noop unclean leader election and one valid unclean leader election
     client = Admin.create(createConfig)
 
@@ -1704,22 +1759,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1728,8 +1781,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker3)
   }
 
-  @Test
-  def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -1744,8 +1798,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(0, allReassignmentsMap.size())
   }
 
-  @Test
-  def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val topic = "list-reassignments-no-reassignments"
@@ -1925,8 +1980,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
   }
 
-  @Test
-  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
     client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1937,9 +1993,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap
       ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
         "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")))
     }, "Expected to see the broker properties we just set", pause=25)
@@ -1953,17 +2007,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap
       ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
         "654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
         "987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
     }, "Expected to see the broker properties we just modified", pause=25)
   }
 
-  @Test
-  def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
     client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1976,9 +2029,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap
       ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
         "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
         "789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
@@ -1993,17 +2044,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap
       ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
         "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
         "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
     }, "Expected to see the broker properties we just removed to be deleted", pause=25)
   }
 
-  @Test
-  def testInvalidIncrementalAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidIncrementalAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -2015,14 +2065,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
     createTopic(topic2)
 
-    //Add duplicate Keys for topic1
+    // Add duplicate Keys for topic1
     var topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET),
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET),
       new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry
     ).asJavaCollection
 
-    //Add valid config for topic2
+    // Add valid config for topic2
     var topic2AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET)
     ).asJavaCollection
@@ -2033,12 +2083,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
 
-    //InvalidRequestException error for topic1
+    // InvalidRequestException error for topic1
     assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
       Some("Error due to duplicate config keys"))
 
-    //operation should succeed for topic2
+    // Operation should succeed for topic2
     alterResult.values().get(topic2Resource).get()
+    ensureConsistentKRaftMetadata()
 
     // Verify that topic1 is not config not updated, and topic2 config is updated
     val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
@@ -2046,10 +2097,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(2, configs.size)
 
     assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
+    assertEquals(Defaults.CompressionType, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
     assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
 
-    //check invalid use of append/subtract operation types
+    // Check invalid use of append/subtract operation types
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND)
     ).asJavaCollection
@@ -2064,14 +2115,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
-      Some("Config value append is not allowed for config"))
-
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException],
-      Some("Config value subtract is not allowed for config"))
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
+      if (isKRaftTest()) {
+        Some("Can't APPEND to key compression.type because its type is not LIST.")
+      } else {
+        Some("Config value append is not allowed for config")
+      })
 
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException],
+      if (isKRaftTest()) {
+        Some("Can't SUBTRACT to key compression.type because its type is not LIST.")
+      } else {
+        Some("Config value subtract is not allowed for config")
+      })
 
-    //try to add invalid config
+    // Try to add invalid config
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET)
     ).asJavaCollection
@@ -2082,11 +2140,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
 
     assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
-      Some("Invalid config value for resource"))
+      if (isKRaftTest()) {
+        Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1")
+      } else {
+        Some("Invalid config value for resource")
+      })
   }
 
-  @Test
-  def testInvalidAlterPartitionReassignments(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidAlterPartitionReassignments(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "alter-reassignments-topic-1"
     val tp1 = new TopicPartition(topic, 0)
@@ -2124,8 +2187,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
   }
 
-  @Test
-  def testLongTopicNames(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLongTopicNames(quorum: String): Unit = {
     val client = Admin.create(createConfig)
     val longTopicName = String.join("", Collections.nCopies(249, "x"));
     val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
@@ -2137,14 +2201,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertTrue(results.containsKey(invalidTopicName))
     assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException])
     assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
-      Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(),
+      Map(new TopicPartitionReplica(longTopicName, 0, 0) -> brokers(0).config.logDirs(0)).asJava).all(),
       classOf[InvalidTopicException])
     client.close()
   }
 
   // Verify that createTopics and alterConfigs fail with null values
-  @Test
-  def testNullConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testNullConfigs(quorum: String): Unit = {
 
     def validateLogConfig(compressionType: String): Unit = {
       val logConfig = zkClient.getLogConfigs(Set(topic), Collections.emptyMap[String, AnyRef])._1(topic)
@@ -2182,8 +2247,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     validateLogConfig(compressionType = "producer")
   }
 
-  @Test
-  def testDescribeConfigsForLog4jLogLevels(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = {
     client = Admin.create(createConfig)
     LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger")
     val loggerConfig = describeBrokerLoggers()
@@ -2198,9 +2264,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
   }
 
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
+  def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val initialLoggerConfig = describeBrokerLoggers()
@@ -2262,9 +2329,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     * 4. Change ROOT logger to ERROR
     * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the curent root logger level)
     */
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = {
+  def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = {
     client = Admin.create(createConfig)
     // step 1 - configure root logger
     val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
@@ -2304,9 +2372,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value())
   }
 
-  @Test
-  @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Disabled // Zk to be re-enabled once KAFKA-8779 is resolved
+  def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val deleteRootLoggerEntry = Seq(
       new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE)
@@ -2315,9 +2384,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException])
   }
 
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = {
+  def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val validLoggerName = "kafka.server.KafkaRequestHandler"
     val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName)
@@ -2359,9 +2429,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     * The AlterConfigs API is deprecated and should not support altering log levels
     */
   @nowarn("cat=deprecation")
-  @Test
-  @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft")) // Zk to be re-enabled once KAFKA-8779 is resolved
+  def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val alterLogLevelsEntries = Seq(
@@ -2591,9 +2661,9 @@ object PlaintextAdminIntegrationTest {
     ).asJava)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
 
     // Verify that first and third resources were not updated and second was updated
     test.ensureConsistentKRaftMetadata()
@@ -2620,9 +2690,9 @@ object PlaintextAdminIntegrationTest {
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     test.ensureConsistentKRaftMetadata()