You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/09 14:28:58 UTC

(kafka) branch trunk updated: KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bdad1631824 KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)
bdad1631824 is described below

commit bdad1631824035c8c31163ca6d054feff5421b29
Author: Zihao Lin <10...@163.com>
AuthorDate: Tue Jan 9 22:28:49 2024 +0800

    KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 237 ++++++++++++---------
 1 file changed, 137 insertions(+), 100 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index d8127746163..0e8c56442c0 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -17,13 +17,13 @@
 package kafka.admin
 
 import java.util.Properties
-
-import kafka.utils.{Exit, TestUtils}
+import kafka.utils.{Exit, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.concurrent.ExecutionException
 import scala.util.Random
@@ -35,9 +35,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
   private val describeTypeState = Array(Array("--state"))
   private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ describeTypeState
 
-  @Test
-  def testDescribeNonExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeNonExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
+
     val missingGroup = "missing.group"
 
     for (describeType <- describeTypes) {
@@ -51,8 +53,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeWithMultipleSubActions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWithMultipleSubActions(quorum: String): Unit = {
     var exitStatus: Option[Int] = None
     var exitMessage: Option[String] = None
     Exit.setExitProcedure { (status, err) =>
@@ -72,8 +75,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))
   }
 
-  @Test
-  def testDescribeWithStateValue(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWithStateValue(quorum: String): Unit = {
     var exitStatus: Option[Int] = None
     var exitMessage: Option[String] = None
     Exit.setExitProcedure { (status, err) =>
@@ -93,10 +97,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]"))
   }
 
-  @Test
-  def testDescribeOffsetsOfNonExistingGroup(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeOffsetsOfNonExistingGroup(quorum: String): Unit = {
     val group = "missing.group"
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -109,10 +114,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       s"Expected the state to be 'Dead', with no members in the group '$group'.")
   }
 
-  @Test
-  def testDescribeMembersOfNonExistingGroup(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeMembersOfNonExistingGroup(quorum: String): Unit = {
     val group = "missing.group"
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -129,10 +135,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).")
   }
 
-  @Test
-  def testDescribeStateOfNonExistingGroup(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateOfNonExistingGroup(quorum: String): Unit = {
     val group = "missing.group"
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -142,14 +149,15 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
 
     val state = service.collectGroupState(group)
     assertTrue(state.state == "Dead" && state.numMembers == 0 &&
-      state.coordinator != null && servers.map(_.config.brokerId).toList.contains(state.coordinator.id),
+      state.coordinator != null && brokers.map(_.config.brokerId).toList.contains(state.coordinator.id),
       s"Expected the state to be 'Dead', with no members in the group '$group'."
     )
   }
 
-  @Test
-  def testDescribeExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     for (describeType <- describeTypes) {
       val group = this.group + describeType.mkString("")
@@ -165,9 +173,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeExistingGroups(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeExistingGroups(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // Create N single-threaded consumer groups from a single-partition topic
     val groups = (for (describeType <- describeTypes) yield {
@@ -190,9 +199,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeAllExistingGroups(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeAllExistingGroups(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // Create N single-threaded consumer groups from a single-partition topic
     for (describeType <- describeTypes) {
@@ -214,9 +224,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeOffsetsOfExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeOffsetsOfExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -235,9 +246,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.")
   }
 
-  @Test
-  def testDescribeMembersOfExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeMembersOfExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -268,9 +280,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeStateOfExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateOfExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
@@ -283,13 +296,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.numMembers == 1 &&
         state.assignmentStrategy == "range" &&
         state.coordinator != null &&
-        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+        brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
     }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
   }
 
-  @Test
-  def testDescribeStateOfExistingGroupWithRoundRobinAssignor(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateOfExistingGroupWithRoundRobinAssignor(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1, strategy = classOf[RoundRobinAssignor].getName)
@@ -302,13 +316,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.numMembers == 1 &&
         state.assignmentStrategy == "roundrobin" &&
         state.coordinator != null &&
-        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+        brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
     }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
   }
 
-  @Test
-  def testDescribeExistingGroupWithNoMembers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeExistingGroupWithNoMembers(quorum: String): Unit = {
+    createOffsetsTopic()
 
     for (describeType <- describeTypes) {
       val group = this.group + describeType.mkString("")
@@ -330,9 +345,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeOffsetsOfExistingGroupWithNoMembers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeOffsetsOfExistingGroupWithNoMembers(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -362,9 +378,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertTrue(succeeded, s"Expected no active member in describe group results, state: $state, assignments: $assignments")
   }
 
-  @Test
-  def testDescribeMembersOfExistingGroupWithNoMembers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeMembersOfExistingGroupWithNoMembers(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -386,9 +403,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected no member in describe group members results for group '$group'")
   }
 
-  @Test
-  def testDescribeStateOfExistingGroupWithNoMembers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateOfExistingGroupWithNoMembers(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group consuming from a single-partition topic
     val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -401,7 +419,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       state.state == "Stable" &&
         state.numMembers == 1 &&
         state.coordinator != null &&
-        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+        brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
     }, s"Expected the group '$group' to initially become stable, and have a single member.")
 
     // stop the consumer so the group has no active member anymore
@@ -413,9 +431,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected the group '$group' to become empty after the only member leaving.")
   }
 
-  @Test
-  def testDescribeWithConsumersWithoutAssignedPartitions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+    createOffsetsTopic()
 
     for (describeType <- describeTypes) {
       val group = this.group + describeType.mkString("")
@@ -432,9 +451,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run two consumers in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 2)
@@ -451,9 +471,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, "Expected rows for consumers with no assigned partitions in describe group results")
   }
 
-  @Test
-  def testDescribeMembersWithConsumersWithoutAssignedPartitions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeMembersWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run two consumers in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 2)
@@ -476,9 +497,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       "Expected additional columns in verbose version of describe members")
   }
 
-  @Test
-  def testDescribeStateWithConsumersWithoutAssignedPartitions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run two consumers in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 2)
@@ -492,9 +514,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, "Expected two consumers in describe group results")
   }
 
-  @Test
-  def testDescribeWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+    createOffsetsTopic()
+
     val topic2 = "foo2"
     createTopic(topic2, 2, 1)
 
@@ -513,9 +537,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test
-  def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+    createOffsetsTopic()
+
     val topic2 = "foo2"
     createTopic(topic2, 2, 1)
 
@@ -530,14 +556,16 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
-        assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
+        assignments.get.count { x => x.group == group && x.partition.isDefined } == 2 &&
+        assignments.get.count { x => x.group == group && x.partition.isEmpty } == 0
     }, "Expected two rows (one row per consumer) in describe group results.")
   }
 
-  @Test
-  def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+    createOffsetsTopic()
+
     val topic2 = "foo2"
     createTopic(topic2, 2, 1)
 
@@ -552,8 +580,8 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count{ x => x.group == group && x.numPartitions == 1 } == 2 &&
-        assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
+        assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 2 &&
+        assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 0
     }, "Expected two rows (one row per consumer) in describe group members results.")
 
     val (state, assignments) = service.collectGroupMembers(group, true)
@@ -561,9 +589,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       "Expected additional columns in verbose version of describe members")
   }
 
-  @Test
-  def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+    createOffsetsTopic()
+
     val topic2 = "foo2"
     createTopic(topic2, 2, 1)
 
@@ -579,11 +609,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, "Expected a stable group with two members in describe group state result.")
   }
 
-  @Test
-  def testDescribeSimpleConsumerGroup(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeSimpleConsumerGroup(quorum: String): Unit = {
     // Ensure that the offsets of consumers which don't use group management are still displayed
 
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    createOffsetsTopic()
     val topic2 = "foo2"
     createTopic(topic2, 2, 1)
     addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
@@ -597,8 +628,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }, "Expected a stable group with two members in describe group state result.")
   }
 
-  @Test
-  def testDescribeGroupWithShortInitializationTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeGroupWithShortInitializationTimeout(quorum: String): Unit = {
     // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
     // complete before the timeout expires
 
@@ -614,8 +646,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
   }
 
-  @Test
-  def testDescribeGroupOffsetsWithShortInitializationTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeGroupOffsetsWithShortInitializationTimeout(quorum: String): Unit = {
     // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
     // complete before the timeout expires
 
@@ -630,8 +663,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
   }
 
-  @Test
-  def testDescribeGroupMembersWithShortInitializationTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeGroupMembersWithShortInitializationTimeout(quorum: String): Unit = {
     // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
     // complete before the timeout expires
 
@@ -648,8 +682,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
   }
 
-  @Test
-  def testDescribeGroupStateWithShortInitializationTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeGroupStateWithShortInitializationTimeout(quorum: String): Unit = {
     // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
     // complete before the timeout expires
 
@@ -664,15 +699,17 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
   }
 
-  @Test
-  def testDescribeWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWithUnrecognizedNewConsumerOption(quorum: String): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
     assertThrows(classOf[joptsimple.OptionException], () => getConsumerGroupService(cgcArgs))
   }
 
-  @Test
-  def testDescribeNonOffsetCommitGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeNonOffsetCommitGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     val customProps = new Properties
     // create a consumer group that never commits offsets