You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/29 20:12:22 UTC
[kafka] branch 2.6 updated: KAFKA-9130;
KIP-518 Allow listing consumer groups per state (#8238)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new c35cd00 KAFKA-9130; KIP-518 Allow listing consumer groups per state (#8238)
c35cd00 is described below
commit c35cd00122bea5dab3dd36fa62b8d6b732ace0cd
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri May 29 19:25:20 2020 +0100
KAFKA-9130; KIP-518 Allow listing consumer groups per state (#8238)
Implementation of KIP-518: https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state.
Reviewers: David Jacot <dj...@confluent.io>, Jason Gustafson <ja...@confluent.io>
Co-authored-by: Mickael Maison <mi...@gmail.com>
Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/admin/ConsumerGroupListing.java | 56 ++++++++++++
.../kafka/clients/admin/KafkaAdminClient.java | 11 ++-
.../clients/admin/ListConsumerGroupsOptions.java | 24 ++++++
.../apache/kafka/common/ConsumerGroupState.java | 1 -
.../kafka/common/requests/ListGroupsRequest.java | 9 ++
.../common/message/ListGroupsRequest.json | 7 +-
.../common/message/ListGroupsResponse.json | 8 +-
.../java/org/apache/kafka/clients/MockClient.java | 16 ++--
.../kafka/clients/admin/KafkaAdminClientTest.java | 88 +++++++++++++++++--
.../kafka/common/requests/RequestResponseTest.java | 46 ++++++----
.../scala/kafka/admin/ConsumerGroupCommand.scala | 99 ++++++++++++++++++----
.../kafka/coordinator/group/GroupCoordinator.scala | 9 +-
.../kafka/coordinator/group/GroupMetadata.scala | 5 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 41 +++++----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 21 ++++-
.../SaslClientsWithInvalidCredentialsTest.scala | 2 +-
.../kafka/admin/DeleteConsumerGroupsTest.scala | 10 +--
.../kafka/admin/DescribeConsumerGroupTest.scala | 44 ++++++++--
.../unit/kafka/admin/ListConsumerGroupTest.scala | 91 +++++++++++++++++++-
.../coordinator/group/GroupCoordinatorTest.scala | 56 ++++++++++--
.../scala/unit/kafka/server/KafkaApisTest.scala | 45 ++++++++++
22 files changed, 598 insertions(+), 93 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index fe7f716..034fbc8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
files="MockAdminClient.java"/>
<suppress checks="JavaNCSS"
- files="RequestResponseTest.java|FetcherTest.java"/>
+ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
index 46da962..0abc3e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -17,12 +17,18 @@
package org.apache.kafka.clients.admin;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.kafka.common.ConsumerGroupState;
+
/**
* A listing of a consumer group in the cluster.
*/
public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
+ private final Optional<ConsumerGroupState> state;
/**
* Create an instance with the specified parameters.
@@ -31,8 +37,20 @@ public class ConsumerGroupListing {
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) {
+ this(groupId, isSimpleConsumerGroup, Optional.empty());
+ }
+
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param groupId Group Id
+ * @param isSimpleConsumerGroup If consumer group is simple or not.
+ * @param state The state of the consumer group
+ */
+ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional<ConsumerGroupState> state) {
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+ this.state = Objects.requireNonNull(state);
}
/**
@@ -49,11 +67,49 @@ public class ConsumerGroupListing {
return isSimpleConsumerGroup;
}
+ /**
+ * Consumer Group state
+ */
+ public Optional<ConsumerGroupState> state() {
+ return state;
+ }
+
@Override
public String toString() {
return "(" +
"groupId='" + groupId + '\'' +
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+ ", state=" + state +
')';
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, isSimpleConsumerGroup, state);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ConsumerGroupListing other = (ConsumerGroupListing) obj;
+ if (groupId == null) {
+ if (other.groupId != null)
+ return false;
+ } else if (!groupId.equals(other.groupId))
+ return false;
+ if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
+ return false;
+ if (state == null) {
+ if (other.state != null)
+ return false;
+ } else if (!state.equals(other.state))
+ return false;
+ return true;
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 989ba4b..52e5c37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3056,14 +3056,21 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
- return new ListGroupsRequest.Builder(new ListGroupsRequestData());
+ List<String> states = options.states()
+ .stream()
+ .map(s -> s.toString())
+ .collect(Collectors.toList());
+ return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states));
}
private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) {
String protocolType = group.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final String groupId = group.groupId();
- final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+ final Optional<ConsumerGroupState> state = group.groupState().equals("")
+ ? Optional.empty()
+ : Optional.of(ConsumerGroupState.parse(group.groupState()));
+ final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty(), state);
results.addListing(groupListing);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
index eb27c79..9f1f38d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -17,6 +17,11 @@
package org.apache.kafka.clients.admin;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
@@ -26,4 +31,23 @@ import org.apache.kafka.common.annotation.InterfaceStability;
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
+
+ private Set<ConsumerGroupState> states = Collections.emptySet();
+
+ /**
+ * If states is set, only groups in these states will be returned by listConsumerGroups()
+ * Otherwise, all groups are returned.
+ * This operation is supported by brokers with version 2.6.0 or later.
+ */
+ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
+ this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states);
+ return this;
+ }
+
+ /**
+ * Returns the list of States that are requested or empty if no states have been specified
+ */
+ public Set<ConsumerGroupState> states() {
+ return states;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
index 7f3d4f0..36d1a4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -45,7 +45,6 @@ public enum ConsumerGroupState {
this.name = name;
}
-
/**
* Parse a string into a consumer group state.
*/
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index caa6f47..ce39385 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -45,6 +46,10 @@ public class ListGroupsRequest extends AbstractRequest {
@Override
public ListGroupsRequest build(short version) {
+ if (!data.statesFilter().isEmpty() && version < 4) {
+ throw new UnsupportedVersionException("The broker only supports ListGroups " +
+ "v" + version + ", but we need v4 or newer to request groups by states.");
+ }
return new ListGroupsRequest(data, version);
}
@@ -66,6 +71,10 @@ public class ListGroupsRequest extends AbstractRequest {
this.data = new ListGroupsRequestData(struct, version);
}
+ public ListGroupsRequestData data() {
+ return data;
+ }
+
@Override
public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ListGroupsResponseData listGroupsResponseData = new ListGroupsResponseData().
diff --git a/clients/src/main/resources/common/message/ListGroupsRequest.json b/clients/src/main/resources/common/message/ListGroupsRequest.json
index f0130e2..dbe6d9b 100644
--- a/clients/src/main/resources/common/message/ListGroupsRequest.json
+++ b/clients/src/main/resources/common/message/ListGroupsRequest.json
@@ -20,8 +20,13 @@
// Version 1 and 2 are the same as version 0.
//
// Version 3 is the first flexible version.
- "validVersions": "0-3",
+ //
+ // Version 4 adds the StatesFilter field (KIP-518).
+ "validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
+ { "name": "StatesFilter", "type": "[]string", "versions": "4+",
+ "about": "The states of the groups we want to list. If empty all groups are returned with their state."
+ }
]
}
diff --git a/clients/src/main/resources/common/message/ListGroupsResponse.json b/clients/src/main/resources/common/message/ListGroupsResponse.json
index aa8bba6..87561c2 100644
--- a/clients/src/main/resources/common/message/ListGroupsResponse.json
+++ b/clients/src/main/resources/common/message/ListGroupsResponse.json
@@ -22,7 +22,9 @@
// Starting in version 2, on quota violation, brokers send out responses before throttling.
//
// Version 3 is the first flexible version.
- "validVersions": "0-3",
+ //
+ // Version 4 adds the GroupState field (KIP-518).
+ "validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
@@ -34,7 +36,9 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID." },
{ "name": "ProtocolType", "type": "string", "versions": "0+",
- "about": "The group protocol type." }
+ "about": "The group protocol type." },
+ { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true,
+ "about": "The group state name." }
]}
]
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index eaf5dcb..6cfc4fd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -215,15 +215,17 @@ public class MockClient implements KafkaClient {
AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
- AbstractRequest abstractRequest = request.requestBuilder().build(version);
- if (!futureResp.requestMatcher.matches(abstractRequest))
- throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest + " with prepared response " + futureResp.responseBody);
UnsupportedVersionException unsupportedVersionException = null;
- if (futureResp.isUnsupportedRequest)
- unsupportedVersionException = new UnsupportedVersionException("Api " +
- request.apiKey() + " with version " + version);
-
+ if (futureResp.isUnsupportedRequest) {
+ unsupportedVersionException = new UnsupportedVersionException(
+ "Api " + request.apiKey() + " with version " + version);
+ } else {
+ AbstractRequest abstractRequest = request.requestBuilder().build(version);
+ if (!futureResp.requestMatcher.matches(abstractRequest))
+ throw new IllegalStateException("Request matcher did not match next-in-line request "
+ + abstractRequest + " with prepared response " + futureResp.responseBody);
+ }
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 4a991d5..77a9e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.MockClient;
@@ -26,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -58,6 +60,7 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
@@ -90,6 +93,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -116,6 +120,7 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
@@ -1233,10 +1238,12 @@ public class KafkaAdminClientTest {
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
- .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-1")
.setProtocolType("connector")
+ .setGroupState("Stable")
))),
env.cluster().nodeById(0));
@@ -1262,10 +1269,12 @@ public class KafkaAdminClientTest {
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
- .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-2")
.setProtocolType("connector")
+ .setGroupState("Stable")
))),
env.cluster().nodeById(1));
@@ -1276,10 +1285,12 @@ public class KafkaAdminClientTest {
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-3")
- .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-3")
.setProtocolType("connector")
+ .setGroupState("Stable")
))),
env.cluster().nodeById(2));
@@ -1300,6 +1311,7 @@ public class KafkaAdminClientTest {
Set<String> groupIds = new HashSet<>();
for (ConsumerGroupListing listing : listings) {
groupIds.add(listing.groupId());
+ assertTrue(listing.state().isPresent());
}
assertEquals(Utils.mkSet("group-1", "group-2", "group-3"), groupIds);
@@ -1331,6 +1343,74 @@ public class KafkaAdminClientTest {
}
@Test
+ public void testListConsumerGroupsWithStates() throws Exception {
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-2")
+ .setGroupState("Empty")))),
+ env.cluster().nodeById(0));
+
+ final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
+ final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
+ Collection<ConsumerGroupListing> listings = result.valid().get();
+
+ assertEquals(2, listings.size());
+ List<ConsumerGroupListing> expected = new ArrayList<>();
+ expected.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY)));
+ expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE)));
+ assertEquals(expected, listings);
+ assertEquals(0, result.errors().get().size());
+ }
+ }
+
+ @Test
+ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
+ ApiVersion listGroupV3 = new ApiVersion(ApiKeys.LIST_GROUPS.id, (short) 0, (short) 3);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3)));
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+
+ // Check we can list groups with older broker if we don't specify states
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
+ env.cluster().nodeById(0));
+ ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
+ ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
+ Collection<ConsumerGroupListing> listing = result.all().get();
+ assertEquals(1, listing.size());
+ List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false, Optional.empty()));
+ assertEquals(expected, listing);
+
+ // But we cannot set a state filter with older broker
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+ env.kafkaClient().prepareUnsupportedVersionResponse(
+ body -> body instanceof ListGroupsRequest);
+
+ options = new ListConsumerGroupsOptions().inStates(Collections.singleton(ConsumerGroupState.STABLE));
+ result = env.adminClient().listConsumerGroups(options);
+ TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+ }
+ }
+
+ @Test
public void testOffsetCommitNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
@@ -2297,8 +2377,6 @@ public class KafkaAdminClientTest {
AtomicLong firstAttemptTime = new AtomicLong(0);
AtomicLong secondAttemptTime = new AtomicLong(0);
- final TopicPartition tp1 = new TopicPartition("foo", 0);
-
mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(body -> {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69b658a..c712c18 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
@@ -229,9 +230,13 @@ public class RequestResponseTest {
checkRequest(createLeaveGroupRequest(), true);
checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true);
checkResponse(createLeaveGroupResponse(), 0, true);
- checkRequest(createListGroupsRequest(), true);
- checkErrorResponse(createListGroupsRequest(), new UnknownServerException(), true);
- checkResponse(createListGroupsResponse(), 0, true);
+
+ for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) {
+ checkRequest(createListGroupsRequest(v), false);
+ checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true);
+ checkResponse(createListGroupsResponse(v), v, true);
+ }
+
checkRequest(createDescribeGroupRequest(), true);
checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true);
checkResponse(createDescribeGroupResponse(), 0, true);
@@ -810,6 +815,13 @@ public class RequestResponseTest {
assertTrue(request.isValid());
}
+ @Test(expected = UnsupportedVersionException.class)
+ public void testListGroupRequestV3FailsWithStates() {
+ ListGroupsRequestData data = new ListGroupsRequestData()
+ .setStatesFilter(asList(ConsumerGroupState.STABLE.name()));
+ new ListGroupsRequest.Builder(data).build((short) 3);
+ }
+
@Test
public void testInvalidApiVersionsRequest() {
testInvalidCase("java@apache_kafka", "0.0.0-SNAPSHOT");
@@ -1066,19 +1078,23 @@ public class RequestResponseTest {
return new SyncGroupResponse(data);
}
- private ListGroupsRequest createListGroupsRequest() {
- return new ListGroupsRequest.Builder(new ListGroupsRequestData()).build();
+ private ListGroupsRequest createListGroupsRequest(short version) {
+ ListGroupsRequestData data = new ListGroupsRequestData();
+ if (version >= 4)
+ data.setStatesFilter(Arrays.asList("Stable"));
+ return new ListGroupsRequest.Builder(data).build(version);
}
- private ListGroupsResponse createListGroupsResponse() {
- return new ListGroupsResponse(
- new ListGroupsResponseData()
- .setErrorCode(Errors.NONE.code())
- .setGroups(Collections.singletonList(
- new ListGroupsResponseData.ListedGroup()
- .setGroupId("test-group")
- .setProtocolType("consumer")
- )));
+ private ListGroupsResponse createListGroupsResponse(int version) {
+ ListGroupsResponseData.ListedGroup group = new ListGroupsResponseData.ListedGroup()
+ .setGroupId("test-group")
+ .setProtocolType("consumer");
+ if (version >= 4)
+ group.setGroupState("Stable");
+ ListGroupsResponseData data = new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(group));
+ return new ListGroupsResponse(data);
}
private DescribeGroupsRequest createDescribeGroupRequest() {
@@ -1133,7 +1149,6 @@ public class RequestResponseTest {
);
}
- @SuppressWarnings("deprecation")
private ListOffsetRequest createListOffsetRequest(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
@@ -1164,7 +1179,6 @@ public class RequestResponseTest {
}
}
- @SuppressWarnings("deprecation")
private ListOffsetResponse createListOffsetResponse(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 8d5087d..a0fd206 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -41,26 +41,35 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
import org.apache.kafka.common.requests.ListOffsetResponse
+import org.apache.kafka.common.ConsumerGroupState
+import joptsimple.OptionException
object ConsumerGroupCommand extends Logging {
def main(args: Array[String]): Unit = {
- val opts = new ConsumerGroupCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
+ val opts = new ConsumerGroupCommandOptions(args)
+ try {
+ opts.checkArgs()
+ CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
- // should have exactly one action
- val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
- if (actions != 1)
- CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
+ // should have exactly one action
+ val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
+ if (actions != 1)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
- opts.checkArgs()
+ run(opts)
+ } catch {
+ case e: OptionException =>
+ CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+ }
+ }
+ def run(opts: ConsumerGroupCommandOptions): Unit = {
val consumerGroupService = new ConsumerGroupService(opts)
-
try {
if (opts.options.has(opts.listOpt))
- consumerGroupService.listGroups().foreach(println(_))
+ consumerGroupService.listGroups()
else if (opts.options.has(opts.describeOpt))
consumerGroupService.describeGroups()
else if (opts.options.has(opts.deleteOpt))
@@ -77,6 +86,8 @@ object ConsumerGroupCommand extends Logging {
consumerGroupService.deleteOffsets()
}
} catch {
+ case e: IllegalArgumentException =>
+ CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
case e: Throwable =>
printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
} finally {
@@ -84,6 +95,15 @@ object ConsumerGroupCommand extends Logging {
}
}
+ def consumerGroupStatesFromString(input: String): Set[ConsumerGroupState] = {
+ val parsedStates = input.split(',').map(s => ConsumerGroupState.parse(s.trim)).toSet
+ if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
+ val validStates = ConsumerGroupState.values().filter(_ != ConsumerGroupState.UNKNOWN)
+ throw new IllegalArgumentException(s"Invalid state list '$input'. Valid states are: ${validStates.mkString(", ")}")
+ }
+ parsedStates
+ }
+
val MISSING_COLUMN_VALUE = "-"
def printError(msg: String, e: Option[Throwable] = None): Unit = {
@@ -178,12 +198,44 @@ object ConsumerGroupCommand extends Logging {
} else None
}
- def listGroups(): List[String] = {
+ def listGroups(): Unit = {
+ if (opts.options.has(opts.stateOpt)) {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ val states = if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ val listings = listConsumerGroupsWithState(states)
+ printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
+ } else
+ listConsumerGroups().foreach(println(_))
+ }
+
+ def listConsumerGroups(): List[String] = {
val result = adminClient.listConsumerGroups(withTimeoutMs(new ListConsumerGroupsOptions))
val listings = result.all.get.asScala
listings.map(_.groupId).toList
}
+ def listConsumerGroupsWithState(states: Set[ConsumerGroupState]): List[ConsumerGroupListing] = {
+ val listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions())
+ listConsumerGroupsOptions.inStates(states.asJava)
+ val result = adminClient.listConsumerGroups(listConsumerGroupsOptions)
+ result.all.get.asScala.toList
+ }
+
+ private def printGroupStates(groupsAndStates: List[(String, String)]): Unit = {
+ // find proper columns width
+ var maxGroupLen = 15
+ for ((groupId, state) <- groupsAndStates) {
+ maxGroupLen = Math.max(maxGroupLen, groupId.length)
+ }
+ println(s"%${-maxGroupLen}s %s".format("GROUP", "STATE"))
+ for ((groupId, state) <- groupsAndStates) {
+ println(s"%${-maxGroupLen}s %s".format(groupId, state))
+ }
+ }
+
private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = {
// numRows contains the number of data rows, if any, compiled from the API call in the caller method.
// if it's undefined or 0, there is no relevant group information to display.
@@ -317,7 +369,7 @@ object ConsumerGroupCommand extends Logging {
def describeGroups(): Unit = {
val groupIds =
- if (opts.options.has(opts.allGroupsOpt)) listGroups()
+ if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val membersOptPresent = opts.options.has(opts.membersOpt)
val stateOptPresent = opts.options.has(opts.stateOpt)
@@ -381,7 +433,7 @@ object ConsumerGroupCommand extends Logging {
def resetOffsets(): Map[String, Map[TopicPartition, OffsetAndMetadata]] = {
val groupIds =
- if (opts.options.has(opts.allGroupsOpt)) listGroups()
+ if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val consumerGroups = adminClient.describeConsumerGroups(
@@ -869,7 +921,7 @@ object ConsumerGroupCommand extends Logging {
def deleteGroups(): Map[String, Throwable] = {
val groupIds =
- if (opts.options.has(opts.allGroupsOpt)) listGroups()
+ if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val groupsToDelete = adminClient.deleteConsumerGroups(
@@ -953,8 +1005,11 @@ object ConsumerGroupCommand extends Logging {
val OffsetsDoc = "Describe the group and list all topic partitions in the group along with their offset lag. " +
"This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + nl +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets"
- val StateDoc = "Describe the group state. This option may be used with '--describe' and '--bootstrap-server' options only." + nl +
- "Example: --bootstrap-server localhost:9092 --describe --group group1 --state"
+ val StateDoc = "When specified with '--describe', includes the state of the group." + nl +
+ "Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + nl +
+ "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + nl +
+ "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + nl +
+ "This option may be used with '--describe', '--list' and '--bootstrap-server' options only."
val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics."
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
@@ -1018,9 +1073,9 @@ object ConsumerGroupCommand extends Logging {
val offsetsOpt = parser.accepts("offsets", OffsetsDoc)
.availableIf(describeOpt)
val stateOpt = parser.accepts("state", StateDoc)
- .availableIf(describeOpt)
-
- parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
+ .availableIf(describeOpt, listOpt)
+ .withOptionalArg()
+ .ofType(classOf[String])
options = parser.parse(args : _*)
@@ -1038,6 +1093,14 @@ object ConsumerGroupCommand extends Logging {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+ val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt)
+ if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) {
+ CommandLineUtils.printUsageAndDie(parser,
+ s"Option $describeOpt takes at most one of these options: ${mutuallyExclusiveOpts.mkString(", ")}")
+ }
+ if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+ CommandLineUtils.printUsageAndDie(parser,
+ s"Option $describeOpt does not take a value for $stateOpt")
} else {
if (options.has(timeoutMsOpt))
debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 0957c5b..00dd09b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -797,12 +797,17 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleListGroups(): (Errors, List[GroupOverview]) = {
+ def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
if (!isActive.get) {
(Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
} else {
val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
- (errorCode, groupManager.currentGroups.map(_.overview).toList)
+ // if states is empty, return all groups
+ val groups = if (states.isEmpty)
+ groupManager.currentGroups
+ else
+ groupManager.currentGroups.filter(g => states.contains(g.summary.state))
+ (errorCode, groups.map(_.overview).toList)
}
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 7e3b470..f82b663 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -159,7 +159,8 @@ private object GroupMetadata extends Logging {
* Case class used to represent group metadata for the ListGroups API
*/
case class GroupOverview(groupId: String,
- protocolType: String)
+ protocolType: String,
+ state: String)
/**
* Case class used to represent group metadata for the DescribeGroup API
@@ -562,7 +563,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}
def overview: GroupOverview = {
- GroupOverview(groupId, protocolType.getOrElse(""))
+ GroupOverview(groupId, protocolType.getOrElse(""), state.toString)
}
def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dc9afcc..28e20cd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -86,6 +86,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
+import kafka.coordinator.group.GroupOverview
/**
@@ -1394,29 +1395,35 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
- val (error, groups) = groupCoordinator.handleListGroups()
+ val listGroupsRequest = request.body[ListGroupsRequest]
+ val states = if (listGroupsRequest.data.statesFilter == null)
+ // Handle a null array the same as empty
+ immutable.Set[String]()
+ else
+ listGroupsRequest.data.statesFilter.asScala.toSet
+
+ def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(error.code)
+ .setGroups(groups.map { group =>
+ val listedGroup = new ListGroupsResponseData.ListedGroup()
+ .setGroupId(group.groupId)
+ .setProtocolType(group.protocolType)
+ .setGroupState(group.state.toString)
+ listedGroup
+ }.asJava)
+ .setThrottleTimeMs(throttleMs)
+ )
+ }
+ val (error, groups) = groupCoordinator.handleListGroups(states)
if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
// With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new ListGroupsResponse(new ListGroupsResponseData()
- .setErrorCode(error.code)
- .setGroups(groups.map { group => new ListGroupsResponseData.ListedGroup()
- .setGroupId(group.groupId)
- .setProtocolType(group.protocolType)}.asJava
- )
- .setThrottleTimeMs(requestThrottleMs)
- ))
+ createResponse(requestThrottleMs, groups, error))
else {
val filteredGroups = groups.filter(group => authorize(request.context, DESCRIBE, GROUP, group.groupId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new ListGroupsResponse(new ListGroupsResponseData()
- .setErrorCode(error.code)
- .setGroups(filteredGroups.map { group => new ListGroupsResponseData.ListedGroup()
- .setGroupId(group.groupId)
- .setProtocolType(group.protocolType)}.asJava
- )
- .setThrottleTimeMs(requestThrottleMs)
- ))
+ createResponse(requestThrottleMs, filteredGroups, error))
}
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 4fbdf99..bab4452 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1085,10 +1085,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
- val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
- matching.nonEmpty
+ val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE)
+ matching.size == 1
}, s"Expected to be able to list $testGroupId")
+ TestUtils.waitUntilTrue(() => {
+ val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
+ val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE)
+ matching.size == 1
+ }, s"Expected to be able to list $testGroupId in state Stable")
+
+ TestUtils.waitUntilTrue(() => {
+ val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.EMPTY).asJava)
+ val matching = client.listConsumerGroups(options).all.get.asScala.filter(
+ _.groupId == testGroupId)
+ matching.isEmpty
+ }, s"Expected to find zero groups")
+
val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index ac7f56c..14fd5a5 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -176,7 +176,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
consumer.subscribe(List(topic).asJava)
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))
- assertEquals(1, consumerGroupService.listGroups.size)
+ assertEquals(1, consumerGroupService.listConsumerGroups.size)
consumerGroupService.close()
}
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 23de790..a3fa8f5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
- service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+ service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
}, "The group did not initialize as expected.")
executor.shutdown()
@@ -137,7 +137,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
- service.listGroups().toSet == groups.keySet &&
+ service.listConsumerGroups().toSet == groups.keySet &&
groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Stable")
}, "The group did not initialize as expected.")
@@ -169,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
- service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+ service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
}, "The group did not initialize as expected.")
executor.shutdown()
@@ -194,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
- service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+ service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
}, "The group did not initialize as expected.")
executor.shutdown()
@@ -221,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
- service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+ service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
}, "The group did not initialize as expected.")
executor.shutdown()
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 560153f..2c28ccf 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -18,8 +18,7 @@ package kafka.admin
import java.util.Properties
-import joptsimple.OptionException
-import kafka.utils.TestUtils
+import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
@@ -52,11 +51,46 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
- @Test(expected = classOf[OptionException])
+ @Test
def testDescribeWithMultipleSubActions(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ var exitStatus: Option[Int] = None
+ var exitMessage: Option[String] = None
+ Exit.setExitProcedure { (status, err) =>
+ exitStatus = Some(status)
+ exitMessage = err
+ throw new RuntimeException
+ }
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
- getConsumerGroupService(cgcArgs)
+ try {
+ ConsumerGroupCommand.main(cgcArgs)
+ } catch {
+ case e: RuntimeException => //expected
+ } finally {
+ Exit.resetExitProcedure()
+ }
+ assertEquals(Some(1), exitStatus)
+ assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))
+ }
+
+ @Test
+ def testDescribeWithStateValue(): Unit = {
+ var exitStatus: Option[Int] = None
+ var exitMessage: Option[String] = None
+ Exit.setExitProcedure { (status, err) =>
+ exitStatus = Some(status)
+ exitMessage = err
+ throw new RuntimeException
+ }
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups", "--state", "Stable")
+ try {
+ ConsumerGroupCommand.main(cgcArgs)
+ } catch {
+ case e: RuntimeException => //expected
+ } finally {
+ Exit.resetExitProcedure()
+ }
+ assertEquals(Some(1), exitStatus)
+ assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]"))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 7429a43..3997c9e 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -17,8 +17,13 @@
package kafka.admin
import joptsimple.OptionException
+import org.junit.Assert._
import org.junit.Test
import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.clients.admin.ConsumerGroupListing
+import java.util.Optional
+import org.scalatest.Assertions.assertThrows
class ListConsumerGroupTest extends ConsumerGroupCommandTest {
@@ -34,7 +39,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
val expectedGroups = Set(group, simpleGroup)
var foundGroups = Set.empty[String]
TestUtils.waitUntilTrue(() => {
- foundGroups = service.listGroups().toSet
+ foundGroups = service.listConsumerGroups().toSet
expectedGroups == foundGroups
}, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
}
@@ -44,4 +49,88 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list")
getConsumerGroupService(cgcArgs)
}
+
+ @Test
+ def testListConsumerGroupsWithStates(): Unit = {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedListing = Set(
+ new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)),
+ new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE)))
+
+ var foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ expectedListing == foundListing
+ }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+ val expectedListingStable = Set(
+ new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE)))
+
+ foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ expectedListingStable == foundListing
+ }, s"Expected to show groups $expectedListingStable, but found $foundListing")
+ }
+
+ @Test
+ def testConsumerGroupStatesFromString(): Unit = {
+ var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable")
+ assertEquals(Set(ConsumerGroupState.STABLE), result)
+
+ result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance")
+ assertEquals(Set(ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE), result)
+
+ result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,")
+ assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE), result)
+
+ assertThrows[IllegalArgumentException] {
+ ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong")
+ }
+
+ assertThrows[IllegalArgumentException] {
+ ConsumerGroupCommand.consumerGroupStatesFromString("stable")
+ }
+
+ assertThrows[IllegalArgumentException] {
+ ConsumerGroupCommand.consumerGroupStatesFromString(" bad, Stable")
+ }
+
+ assertThrows[IllegalArgumentException] {
+ ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")
+ }
+ }
+
+ @Test
+ def testListGroupCommand(): Unit = {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+ var out = ""
+
+ var cgcArgs = Array("--bootstrap-server", brokerList, "--list")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
+ }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+
+ cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
+ }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+
+ cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state", "Stable")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("STATE") && out.contains(group) && out.contains("Stable")
+ }, s"Expected to find $group in state Stable and the header, but found $out")
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3200f81..9791cd6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -174,7 +174,7 @@ class GroupCoordinatorTest {
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError)
// ListGroups
- val (listGroupsError, _) = groupCoordinator.handleListGroups()
+ val (listGroupsError, _) = groupCoordinator.handleListGroups(Set())
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
// DeleteGroups
@@ -3208,10 +3208,10 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
- val (error, groups) = groupCoordinator.handleListGroups()
+ val (error, groups) = groupCoordinator.handleListGroups(Set())
assertEquals(Errors.NONE, error)
assertEquals(1, groups.size)
- assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+ assertEquals(GroupOverview("groupId", "consumer", Stable.toString), groups.head)
}
@Test
@@ -3220,10 +3220,56 @@ class GroupCoordinatorTest {
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
assertEquals(Errors.NONE, joinGroupResult.error)
- val (error, groups) = groupCoordinator.handleListGroups()
+ val (error, groups) = groupCoordinator.handleListGroups(Set())
assertEquals(Errors.NONE, error)
assertEquals(1, groups.size)
- assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+ assertEquals(GroupOverview("groupId", "consumer", CompletingRebalance.toString), groups.head)
+ }
+
+ @Test
+ def testListGroupsWithStates(): Unit = {
+ val allStates = Set(PreparingRebalance, CompletingRebalance, Stable, Dead, Empty).map(s => s.toString)
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ // Member joins the group
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ assertEquals(Errors.NONE, joinGroupResult.error)
+
+ // The group should be in CompletingRebalance
+ val (error, groups) = groupCoordinator.handleListGroups(Set(CompletingRebalance.toString))
+ assertEquals(Errors.NONE, error)
+ assertEquals(1, groups.size)
+ val (error2, groups2) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == CompletingRebalance.toString))
+ assertEquals(Errors.NONE, error2)
+ assertEquals(0, groups2.size)
+
+ // Member syncs
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ assertEquals(Errors.NONE, syncGroupResult.error)
+
+ // The group is now stable
+ val (error3, groups3) = groupCoordinator.handleListGroups(Set(Stable.toString))
+ assertEquals(Errors.NONE, error3)
+ assertEquals(1, groups3.size)
+ val (error4, groups4) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Stable.toString))
+ assertEquals(Errors.NONE, error4)
+ assertEquals(0, groups4.size)
+
+ // Member leaves
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+ verifyLeaveGroupResult(leaveGroupResults)
+
+ // The group is now empty
+ val (error5, groups5) = groupCoordinator.handleListGroups(Set(Empty.toString))
+ assertEquals(Errors.NONE, error5)
+ assertEquals(1, groups5.size)
+ val (error6, groups6) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Empty.toString))
+ assertEquals(Errors.NONE, error6)
+ assertEquals(0, groups6.size)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 82ca828..0b9c4a4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,6 +28,7 @@ import kafka.api.LeaderAndIsr
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
import kafka.cluster.Partition
import kafka.controller.KafkaController
+import kafka.coordinator.group.GroupOverview
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
import kafka.coordinator.group.JoinGroupResult
@@ -1859,6 +1860,50 @@ class KafkaApisTest {
EasyMock.verify(replicaManager)
}
+ @Test
+ def testListGroupsRequest(): Unit = {
+ val overviews = List(
+ GroupOverview("group1", "protocol1", "Stable"),
+ GroupOverview("group2", "qwerty", "Empty")
+ )
+ val response = listGroupRequest(None, overviews)
+ assertEquals(2, response.data.groups.size)
+ assertEquals("Stable", response.data.groups.get(0).groupState)
+ assertEquals("Empty", response.data.groups.get(1).groupState)
+ }
+
+ @Test
+ def testListGroupsRequestWithState(): Unit = {
+ val overviews = List(
+ GroupOverview("group1", "protocol1", "Stable")
+ )
+ val response = listGroupRequest(Some("Stable"), overviews)
+ assertEquals(1, response.data.groups.size)
+ assertEquals("Stable", response.data.groups.get(0).groupState)
+ }
+
+ private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = {
+ EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+ val data = new ListGroupsRequestData()
+ if (state.isDefined)
+ data.setStatesFilter(Collections.singletonList(state.get))
+ val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
+ val requestChannelRequest = buildRequest(listGroupsRequest)
+
+ val capturedResponse = expectNoThrottling()
+ val expectedStates: Set[String] = if (state.isDefined) Set(state.get) else Set()
+ EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
+ .andReturn((Errors.NONE, overviews))
+ EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis().handleListGroupsRequest(requestChannelRequest)
+
+ val response = readResponse(ApiKeys.LIST_GROUPS, listGroupsRequest, capturedResponse).asInstanceOf[ListGroupsResponse]
+ assertEquals(Errors.NONE.code, response.data.errorCode)
+ response
+ }
+
/**
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/