You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/09 14:28:58 UTC
(kafka) branch trunk updated: KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bdad1631824 KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)
bdad1631824 is described below
commit bdad1631824035c8c31163ca6d054feff5421b29
Author: Zihao Lin <10...@163.com>
AuthorDate: Tue Jan 9 22:28:49 2024 +0800
KAFKA-15741: KRaft support in DescribeConsumerGroupTest (#14668)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../kafka/admin/DescribeConsumerGroupTest.scala | 237 ++++++++++++---------
1 file changed, 137 insertions(+), 100 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index d8127746163..0e8c56442c0 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -17,13 +17,13 @@
package kafka.admin
import java.util.Properties
-
-import kafka.utils.{Exit, TestUtils}
+import kafka.utils.{Exit, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.concurrent.ExecutionException
import scala.util.Random
@@ -35,9 +35,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
private val describeTypeState = Array(Array("--state"))
private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ describeTypeState
- @Test
- def testDescribeNonExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeNonExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
+
val missingGroup = "missing.group"
for (describeType <- describeTypes) {
@@ -51,8 +53,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeWithMultipleSubActions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWithMultipleSubActions(quorum: String): Unit = {
var exitStatus: Option[Int] = None
var exitMessage: Option[String] = None
Exit.setExitProcedure { (status, err) =>
@@ -72,8 +75,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))
}
- @Test
- def testDescribeWithStateValue(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWithStateValue(quorum: String): Unit = {
var exitStatus: Option[Int] = None
var exitMessage: Option[String] = None
Exit.setExitProcedure { (status, err) =>
@@ -93,10 +97,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]"))
}
- @Test
- def testDescribeOffsetsOfNonExistingGroup(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeOffsetsOfNonExistingGroup(quorum: String): Unit = {
val group = "missing.group"
- TestUtils.createOffsetsTopic(zkClient, servers)
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -109,10 +114,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
s"Expected the state to be 'Dead', with no members in the group '$group'.")
}
- @Test
- def testDescribeMembersOfNonExistingGroup(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeMembersOfNonExistingGroup(quorum: String): Unit = {
val group = "missing.group"
- TestUtils.createOffsetsTopic(zkClient, servers)
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -129,10 +135,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).")
}
- @Test
- def testDescribeStateOfNonExistingGroup(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateOfNonExistingGroup(quorum: String): Unit = {
val group = "missing.group"
- TestUtils.createOffsetsTopic(zkClient, servers)
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -142,14 +149,15 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val state = service.collectGroupState(group)
assertTrue(state.state == "Dead" && state.numMembers == 0 &&
- state.coordinator != null && servers.map(_.config.brokerId).toList.contains(state.coordinator.id),
+ state.coordinator != null && brokers.map(_.config.brokerId).toList.contains(state.coordinator.id),
s"Expected the state to be 'Dead', with no members in the group '$group'."
)
}
- @Test
- def testDescribeExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
@@ -165,9 +173,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeExistingGroups(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeExistingGroups(quorum: String): Unit = {
+ createOffsetsTopic()
// Create N single-threaded consumer groups from a single-partition topic
val groups = (for (describeType <- describeTypes) yield {
@@ -190,9 +199,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeAllExistingGroups(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAllExistingGroups(quorum: String): Unit = {
+ createOffsetsTopic()
// Create N single-threaded consumer groups from a single-partition topic
for (describeType <- describeTypes) {
@@ -214,9 +224,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeOffsetsOfExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeOffsetsOfExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -235,9 +246,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.")
}
- @Test
- def testDescribeMembersOfExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeMembersOfExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -268,9 +280,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeStateOfExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateOfExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
@@ -283,13 +296,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
state.numMembers == 1 &&
state.assignmentStrategy == "range" &&
state.coordinator != null &&
- servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+ brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
}
- @Test
- def testDescribeStateOfExistingGroupWithRoundRobinAssignor(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateOfExistingGroupWithRoundRobinAssignor(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, strategy = classOf[RoundRobinAssignor].getName)
@@ -302,13 +316,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
state.numMembers == 1 &&
state.assignmentStrategy == "roundrobin" &&
state.coordinator != null &&
- servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+ brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
}
- @Test
- def testDescribeExistingGroupWithNoMembers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeExistingGroupWithNoMembers(quorum: String): Unit = {
+ createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
@@ -330,9 +345,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeOffsetsOfExistingGroupWithNoMembers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeOffsetsOfExistingGroupWithNoMembers(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -362,9 +378,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertTrue(succeeded, s"Expected no active member in describe group results, state: $state, assignments: $assignments")
}
- @Test
- def testDescribeMembersOfExistingGroupWithNoMembers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeMembersOfExistingGroupWithNoMembers(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -386,9 +403,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected no member in describe group members results for group '$group'")
}
- @Test
- def testDescribeStateOfExistingGroupWithNoMembers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateOfExistingGroupWithNoMembers(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -401,7 +419,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
state.state == "Stable" &&
state.numMembers == 1 &&
state.coordinator != null &&
- servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+ brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected the group '$group' to initially become stable, and have a single member.")
// stop the consumer so the group has no active member anymore
@@ -413,9 +431,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected the group '$group' to become empty after the only member leaving.")
}
- @Test
- def testDescribeWithConsumersWithoutAssignedPartitions(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+ createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
@@ -432,9 +451,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+ createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
@@ -451,9 +471,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, "Expected rows for consumers with no assigned partitions in describe group results")
}
- @Test
- def testDescribeMembersWithConsumersWithoutAssignedPartitions(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeMembersWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+ createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
@@ -476,9 +497,10 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
"Expected additional columns in verbose version of describe members")
}
- @Test
- def testDescribeStateWithConsumersWithoutAssignedPartitions(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateWithConsumersWithoutAssignedPartitions(quorum: String): Unit = {
+ createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
@@ -492,9 +514,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, "Expected two consumers in describe group results")
}
- @Test
- def testDescribeWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+ createOffsetsTopic()
+
val topic2 = "foo2"
createTopic(topic2, 2, 1)
@@ -513,9 +537,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test
- def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+ createOffsetsTopic()
+
val topic2 = "foo2"
createTopic(topic2, 2, 1)
@@ -530,14 +556,16 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
- assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
- assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
+ assignments.get.count { x => x.group == group && x.partition.isDefined } == 2 &&
+ assignments.get.count { x => x.group == group && x.partition.isEmpty } == 0
}, "Expected two rows (one row per consumer) in describe group results.")
}
- @Test
- def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+ createOffsetsTopic()
+
val topic2 = "foo2"
createTopic(topic2, 2, 1)
@@ -552,8 +580,8 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
- assignments.get.count{ x => x.group == group && x.numPartitions == 1 } == 2 &&
- assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
+ assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 2 &&
+ assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 0
}, "Expected two rows (one row per consumer) in describe group members results.")
val (state, assignments) = service.collectGroupMembers(group, true)
@@ -561,9 +589,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
"Expected additional columns in verbose version of describe members")
}
- @Test
- def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(quorum: String): Unit = {
+ createOffsetsTopic()
+
val topic2 = "foo2"
createTopic(topic2, 2, 1)
@@ -579,11 +609,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, "Expected a stable group with two members in describe group state result.")
}
- @Test
- def testDescribeSimpleConsumerGroup(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeSimpleConsumerGroup(quorum: String): Unit = {
// Ensure that the offsets of consumers which don't use group management are still displayed
- TestUtils.createOffsetsTopic(zkClient, servers)
+ createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
@@ -597,8 +628,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, "Expected a stable group with two members in describe group state result.")
}
- @Test
- def testDescribeGroupWithShortInitializationTimeout(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeGroupWithShortInitializationTimeout(quorum: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
@@ -614,8 +646,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
- @Test
- def testDescribeGroupOffsetsWithShortInitializationTimeout(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeGroupOffsetsWithShortInitializationTimeout(quorum: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
@@ -630,8 +663,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
- @Test
- def testDescribeGroupMembersWithShortInitializationTimeout(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeGroupMembersWithShortInitializationTimeout(quorum: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
@@ -648,8 +682,9 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
- @Test
- def testDescribeGroupStateWithShortInitializationTimeout(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeGroupStateWithShortInitializationTimeout(quorum: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
@@ -664,15 +699,17 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
- @Test
- def testDescribeWithUnrecognizedNewConsumerOption(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWithUnrecognizedNewConsumerOption(quorum: String): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
assertThrows(classOf[joptsimple.OptionException], () => getConsumerGroupService(cgcArgs))
}
- @Test
- def testDescribeNonOffsetCommitGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeNonOffsetCommitGroup(quorum: String): Unit = {
+ createOffsetsTopic()
val customProps = new Properties
// create a consumer group that never commits offsets