You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/29 20:12:22 UTC

[kafka] branch 2.6 updated: KAFKA-9130; KIP-518 Allow listing consumer groups per state (#8238)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new c35cd00  KAFKA-9130; KIP-518 Allow listing consumer groups per state (#8238)
c35cd00 is described below

commit c35cd00122bea5dab3dd36fa62b8d6b732ace0cd
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri May 29 19:25:20 2020 +0100

    KAFKA-9130; KIP-518 Allow listing consumer groups per state (#8238)
    
    Implementation of KIP-518: https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state.
    
    Reviewers: David Jacot <dj...@confluent.io>, Jason Gustafson <ja...@confluent.io>
    
    Co-authored-by: Mickael Maison <mi...@gmail.com>
    Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kafka/clients/admin/ConsumerGroupListing.java  | 56 ++++++++++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 11 ++-
 .../clients/admin/ListConsumerGroupsOptions.java   | 24 ++++++
 .../apache/kafka/common/ConsumerGroupState.java    |  1 -
 .../kafka/common/requests/ListGroupsRequest.java   |  9 ++
 .../common/message/ListGroupsRequest.json          |  7 +-
 .../common/message/ListGroupsResponse.json         |  8 +-
 .../java/org/apache/kafka/clients/MockClient.java  | 16 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 88 +++++++++++++++++--
 .../kafka/common/requests/RequestResponseTest.java | 46 ++++++----
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 99 ++++++++++++++++++----
 .../kafka/coordinator/group/GroupCoordinator.scala |  9 +-
 .../kafka/coordinator/group/GroupMetadata.scala    |  5 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 41 +++++----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 21 ++++-
 .../SaslClientsWithInvalidCredentialsTest.scala    |  2 +-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     | 10 +--
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 44 ++++++++--
 .../unit/kafka/admin/ListConsumerGroupTest.scala   | 91 +++++++++++++++++++-
 .../coordinator/group/GroupCoordinatorTest.scala   | 56 ++++++++++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 45 ++++++++++
 22 files changed, 598 insertions(+), 93 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index fe7f716..034fbc8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
               files="MockAdminClient.java"/>
 
     <suppress checks="JavaNCSS"
-              files="RequestResponseTest.java|FetcherTest.java"/>
+              files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
 
     <suppress checks="NPathComplexity"
               files="MemoryRecordsTest|MetricsTest"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
index 46da962..0abc3e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -17,12 +17,18 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.kafka.common.ConsumerGroupState;
+
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
+    private final Optional<ConsumerGroupState> state;
 
     /**
      * Create an instance with the specified parameters.
@@ -31,8 +37,20 @@ public class ConsumerGroupListing {
      * @param isSimpleConsumerGroup If consumer group is simple or not.
      */
     public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) {
+        this(groupId, isSimpleConsumerGroup, Optional.empty());
+    }
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param isSimpleConsumerGroup If consumer group is simple or not.
+     * @param state The state of the consumer group
+     */
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional<ConsumerGroupState> state) {
         this.groupId = groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.state = Objects.requireNonNull(state);
     }
 
     /**
@@ -49,11 +67,49 @@ public class ConsumerGroupListing {
         return isSimpleConsumerGroup;
     }
 
+    /**
+     * Consumer Group state
+     */
+    public Optional<ConsumerGroupState> state() {
+        return state;
+    }
+
     @Override
     public String toString() {
         return "(" +
             "groupId='" + groupId + '\'' +
             ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+            ", state=" + state +
             ')';
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupId, isSimpleConsumerGroup, state);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ConsumerGroupListing other = (ConsumerGroupListing) obj;
+        if (groupId == null) {
+            if (other.groupId != null)
+                return false;
+        } else if (!groupId.equals(other.groupId))
+            return false;
+        if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
+            return false;
+        if (state == null) {
+            if (other.state != null)
+                return false;
+        } else if (!state.equals(other.state))
+            return false;
+        return true;
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 989ba4b..52e5c37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3056,14 +3056,21 @@ public class KafkaAdminClient extends AdminClient {
                     runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
                         @Override
                         ListGroupsRequest.Builder createRequest(int timeoutMs) {
-                            return new ListGroupsRequest.Builder(new ListGroupsRequestData());
+                            List<String> states = options.states()
+                                    .stream()
+                                    .map(s -> s.toString())
+                                    .collect(Collectors.toList());
+                            return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states));
                         }
 
                         private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) {
                             String protocolType = group.protocolType();
                             if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
                                 final String groupId = group.groupId();
-                                final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                final Optional<ConsumerGroupState> state = group.groupState().equals("")
+                                        ? Optional.empty()
+                                        : Optional.of(ConsumerGroupState.parse(group.groupState()));
+                                final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty(), state);
                                 results.addListing(groupListing);
                             }
                         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
index eb27c79..9f1f38d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
@@ -26,4 +31,23 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
+
+    private Set<ConsumerGroupState> states = Collections.emptySet();
+
+    /**
+     * If states is set, only groups in these states will be returned by listConsumerGroups()
+     * Otherwise, all groups are returned.
+     * This operation is supported by brokers with version 2.6.0 or later.
+     */
+    public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
+        this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states);
+        return this;
+    }
+
+    /**
+     * Returns the list of States that are requested or empty if no states have been specified
+     */
+    public Set<ConsumerGroupState> states() {
+        return states;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
index 7f3d4f0..36d1a4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -45,7 +45,6 @@ public enum ConsumerGroupState {
         this.name = name;
     }
 
-
     /**
      * Parse a string into a consumer group state.
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index caa6f47..ce39385 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -45,6 +46,10 @@ public class ListGroupsRequest extends AbstractRequest {
 
         @Override
         public ListGroupsRequest build(short version) {
+            if (!data.statesFilter().isEmpty() && version < 4) {
+                throw new UnsupportedVersionException("The broker only supports ListGroups " +
+                        "v" + version + ", but we need v4 or newer to request groups by states.");
+            }
             return new ListGroupsRequest(data, version);
         }
 
@@ -66,6 +71,10 @@ public class ListGroupsRequest extends AbstractRequest {
         this.data = new ListGroupsRequestData(struct, version);
     }
 
+    public ListGroupsRequestData data() {
+        return data;
+    }
+
     @Override
     public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         ListGroupsResponseData listGroupsResponseData = new ListGroupsResponseData().
diff --git a/clients/src/main/resources/common/message/ListGroupsRequest.json b/clients/src/main/resources/common/message/ListGroupsRequest.json
index f0130e2..dbe6d9b 100644
--- a/clients/src/main/resources/common/message/ListGroupsRequest.json
+++ b/clients/src/main/resources/common/message/ListGroupsRequest.json
@@ -20,8 +20,13 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the StatesFilter field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
+      "about": "The states of the groups we want to list. If empty all groups are returned with their state."
+    }
   ]
 }
diff --git a/clients/src/main/resources/common/message/ListGroupsResponse.json b/clients/src/main/resources/common/message/ListGroupsResponse.json
index aa8bba6..87561c2 100644
--- a/clients/src/main/resources/common/message/ListGroupsResponse.json
+++ b/clients/src/main/resources/common/message/ListGroupsResponse.json
@@ -22,7 +22,9 @@
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the GroupState field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
@@ -34,7 +36,9 @@
       { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
         "about": "The group ID." },
       { "name": "ProtocolType", "type": "string", "versions": "0+",
-        "about": "The group protocol type." }
+        "about": "The group protocol type." },
+      { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true,
+        "about": "The group state name." }
     ]}
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index eaf5dcb..6cfc4fd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -215,15 +215,17 @@ public class MockClient implements KafkaClient {
             AbstractRequest.Builder<?> builder = request.requestBuilder();
             short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
                     builder.latestAllowedVersion());
-            AbstractRequest abstractRequest = request.requestBuilder().build(version);
-            if (!futureResp.requestMatcher.matches(abstractRequest))
-                throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest + " with prepared response " + futureResp.responseBody);
 
             UnsupportedVersionException unsupportedVersionException = null;
-            if (futureResp.isUnsupportedRequest)
-                unsupportedVersionException = new UnsupportedVersionException("Api " +
-                        request.apiKey() + " with version " + version);
-
+            if (futureResp.isUnsupportedRequest) {
+                unsupportedVersionException = new UnsupportedVersionException(
+                        "Api " + request.apiKey() + " with version " + version);
+            } else {
+                AbstractRequest abstractRequest = request.requestBuilder().build(version);
+                if (!futureResp.requestMatcher.matches(abstractRequest))
+                    throw new IllegalStateException("Request matcher did not match next-in-line request "
+                            + abstractRequest + " with prepared response " + futureResp.responseBody);
+            }
             ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                     request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
                     unsupportedVersionException, null, futureResp.responseBody);
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 4a991d5..77a9e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.ApiVersion;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
@@ -26,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -58,6 +60,7 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
@@ -90,6 +93,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -116,6 +120,7 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
@@ -1233,10 +1238,12 @@ public class KafkaAdminClientTest {
                             .setGroups(Arrays.asList(
                                     new ListGroupsResponseData.ListedGroup()
                                             .setGroupId("group-1")
-                                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                            .setGroupState("Stable"),
                                     new ListGroupsResponseData.ListedGroup()
                                             .setGroupId("group-connect-1")
                                             .setProtocolType("connector")
+                                            .setGroupState("Stable")
                             ))),
                     env.cluster().nodeById(0));
 
@@ -1262,10 +1269,12 @@ public class KafkaAdminClientTest {
                                     .setGroups(Arrays.asList(
                                             new ListGroupsResponseData.ListedGroup()
                                                     .setGroupId("group-2")
-                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                                    .setGroupState("Stable"),
                                             new ListGroupsResponseData.ListedGroup()
                                                     .setGroupId("group-connect-2")
                                                     .setProtocolType("connector")
+                                                    .setGroupState("Stable")
                             ))),
                     env.cluster().nodeById(1));
 
@@ -1276,10 +1285,12 @@ public class KafkaAdminClientTest {
                                     .setGroups(Arrays.asList(
                                             new ListGroupsResponseData.ListedGroup()
                                                     .setGroupId("group-3")
-                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                                    .setGroupState("Stable"),
                                             new ListGroupsResponseData.ListedGroup()
                                                     .setGroupId("group-connect-3")
                                                     .setProtocolType("connector")
+                                                    .setGroupState("Stable")
                                     ))),
                     env.cluster().nodeById(2));
 
@@ -1300,6 +1311,7 @@ public class KafkaAdminClientTest {
             Set<String> groupIds = new HashSet<>();
             for (ConsumerGroupListing listing : listings) {
                 groupIds.add(listing.groupId());
+                assertTrue(listing.state().isPresent());
             }
 
             assertEquals(Utils.mkSet("group-1", "group-2", "group-3"), groupIds);
@@ -1331,6 +1343,74 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testListConsumerGroupsWithStates() throws Exception {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-1")
+                                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-2")
+                                .setGroupState("Empty")))),
+                env.cluster().nodeById(0));
+
+            final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
+            final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<ConsumerGroupListing> expected = new ArrayList<>();
+            expected.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY)));
+            expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
+        ApiVersion listGroupV3 = new ApiVersion(ApiKeys.LIST_GROUPS.id, (short) 0, (short) 3);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3)));
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+
+            // Check we can list groups with older broker if we don't specify states
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Collections.singletonList(
+                                new ListGroupsResponseData.ListedGroup()
+                                    .setGroupId("group-1")
+                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
+                    env.cluster().nodeById(0));
+            ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
+            ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listing = result.all().get();
+            assertEquals(1, listing.size());
+            List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false, Optional.empty()));
+            assertEquals(expected, listing);
+
+            // But we cannot set a state filter with older broker
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                body -> body instanceof ListGroupsRequest);
+
+            options = new ListConsumerGroupsOptions().inStates(Collections.singleton(ConsumerGroupState.STABLE));
+            result = env.adminClient().listConsumerGroups(options);
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
     public void testOffsetCommitNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
         final Time time = new MockTime();
@@ -2297,8 +2377,6 @@ public class KafkaAdminClientTest {
             AtomicLong firstAttemptTime = new AtomicLong(0);
             AtomicLong secondAttemptTime = new AtomicLong(0);
 
-            final TopicPartition tp1 = new TopicPartition("foo", 0);
-
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(body -> {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69b658a..c712c18 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
@@ -229,9 +230,13 @@ public class RequestResponseTest {
         checkRequest(createLeaveGroupRequest(), true);
         checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true);
         checkResponse(createLeaveGroupResponse(), 0, true);
-        checkRequest(createListGroupsRequest(), true);
-        checkErrorResponse(createListGroupsRequest(), new UnknownServerException(), true);
-        checkResponse(createListGroupsResponse(), 0, true);
+
+        for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) {
+            checkRequest(createListGroupsRequest(v), false);
+            checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true);
+            checkResponse(createListGroupsResponse(v), v, true);
+        }
+
         checkRequest(createDescribeGroupRequest(), true);
         checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true);
         checkResponse(createDescribeGroupResponse(), 0, true);
@@ -810,6 +815,13 @@ public class RequestResponseTest {
         assertTrue(request.isValid());
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void testListGroupRequestV3FailsWithStates() {
+        ListGroupsRequestData data = new ListGroupsRequestData()
+                .setStatesFilter(asList(ConsumerGroupState.STABLE.name()));
+        new ListGroupsRequest.Builder(data).build((short) 3);
+    }
+
     @Test
     public void testInvalidApiVersionsRequest() {
         testInvalidCase("java@apache_kafka", "0.0.0-SNAPSHOT");
@@ -1066,19 +1078,23 @@ public class RequestResponseTest {
         return new SyncGroupResponse(data);
     }
 
-    private ListGroupsRequest createListGroupsRequest() {
-        return new ListGroupsRequest.Builder(new ListGroupsRequestData()).build();
+    private ListGroupsRequest createListGroupsRequest(short version) {
+        ListGroupsRequestData data = new ListGroupsRequestData();
+        if (version >= 4)
+            data.setStatesFilter(Arrays.asList("Stable"));
+        return new ListGroupsRequest.Builder(data).build(version);
     }
 
-    private ListGroupsResponse createListGroupsResponse() {
-        return new ListGroupsResponse(
-                new ListGroupsResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setGroups(Collections.singletonList(
-                                new ListGroupsResponseData.ListedGroup()
-                                        .setGroupId("test-group")
-                                        .setProtocolType("consumer")
-                )));
+    private ListGroupsResponse createListGroupsResponse(int version) {
+        ListGroupsResponseData.ListedGroup group = new ListGroupsResponseData.ListedGroup()
+                .setGroupId("test-group")
+                .setProtocolType("consumer");
+        if (version >= 4)
+            group.setGroupState("Stable");
+        ListGroupsResponseData data = new ListGroupsResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setGroups(Collections.singletonList(group));
+        return new ListGroupsResponse(data);
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
@@ -1133,7 +1149,6 @@ public class RequestResponseTest {
         );
     }
 
-    @SuppressWarnings("deprecation")
     private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
             Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
@@ -1164,7 +1179,6 @@ public class RequestResponseTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     private ListOffsetResponse createListOffsetResponse(int version) {
         if (version == 0) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 8d5087d..a0fd206 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -41,26 +41,35 @@ import org.apache.kafka.common.protocol.Errors
 import scala.collection.immutable.TreeMap
 import scala.reflect.ClassTag
 import org.apache.kafka.common.requests.ListOffsetResponse
+import org.apache.kafka.common.ConsumerGroupState
+import joptsimple.OptionException
 
 object ConsumerGroupCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val opts = new ConsumerGroupCommandOptions(args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
+    val opts = new ConsumerGroupCommandOptions(args)
+    try {
+      opts.checkArgs()
+      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
 
-    // should have exactly one action
-    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
-    if (actions != 1)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
+      // should have exactly one action
+      val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
+      if (actions != 1)
+        CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
 
-    opts.checkArgs()
+      run(opts)
+    } catch {
+      case e: OptionException =>
+        CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+    }
+  }
 
+  def run(opts: ConsumerGroupCommandOptions): Unit = {
     val consumerGroupService = new ConsumerGroupService(opts)
-
     try {
       if (opts.options.has(opts.listOpt))
-        consumerGroupService.listGroups().foreach(println(_))
+        consumerGroupService.listGroups()
       else if (opts.options.has(opts.describeOpt))
         consumerGroupService.describeGroups()
       else if (opts.options.has(opts.deleteOpt))
@@ -77,6 +86,8 @@ object ConsumerGroupCommand extends Logging {
         consumerGroupService.deleteOffsets()
       }
     } catch {
+      case e: IllegalArgumentException =>
+        CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
       case e: Throwable =>
         printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
     } finally {
@@ -84,6 +95,15 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def consumerGroupStatesFromString(input: String): Set[ConsumerGroupState] = {
+    val parsedStates = input.split(',').map(s => ConsumerGroupState.parse(s.trim)).toSet
+    if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
+      val validStates = ConsumerGroupState.values().filter(_ != ConsumerGroupState.UNKNOWN)
+      throw new IllegalArgumentException(s"Invalid state list '$input'. Valid states are: ${validStates.mkString(", ")}")
+    }
+    parsedStates
+  }
+
   val MISSING_COLUMN_VALUE = "-"
 
   def printError(msg: String, e: Option[Throwable] = None): Unit = {
@@ -178,12 +198,44 @@ object ConsumerGroupCommand extends Logging {
       } else None
     }
 
-    def listGroups(): List[String] = {
+    def listGroups(): Unit = {
+      if (opts.options.has(opts.stateOpt)) {
+        val stateValue = opts.options.valueOf(opts.stateOpt)
+        val states = if (stateValue == null || stateValue.isEmpty)
+          Set[ConsumerGroupState]()
+        else
+          consumerGroupStatesFromString(stateValue)
+        val listings = listConsumerGroupsWithState(states)
+        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
+      } else
+        listConsumerGroups().foreach(println(_))
+    }
+
+    def listConsumerGroups(): List[String] = {
       val result = adminClient.listConsumerGroups(withTimeoutMs(new ListConsumerGroupsOptions))
       val listings = result.all.get.asScala
       listings.map(_.groupId).toList
     }
 
+    def listConsumerGroupsWithState(states: Set[ConsumerGroupState]): List[ConsumerGroupListing] = {
+      val listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions())
+      listConsumerGroupsOptions.inStates(states.asJava)
+      val result = adminClient.listConsumerGroups(listConsumerGroupsOptions)
+      result.all.get.asScala.toList
+    }
+
+    private def printGroupStates(groupsAndStates: List[(String, String)]): Unit = {
+      // find proper columns width
+      var maxGroupLen = 15
+      for ((groupId, state) <- groupsAndStates) {
+        maxGroupLen = Math.max(maxGroupLen, groupId.length)
+      }
+      println(s"%${-maxGroupLen}s %s".format("GROUP", "STATE"))
+      for ((groupId, state) <- groupsAndStates) {
+        println(s"%${-maxGroupLen}s %s".format(groupId, state))
+      }
+    }
+
     private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = {
       // numRows contains the number of data rows, if any, compiled from the API call in the caller method.
       // if it's undefined or 0, there is no relevant group information to display.
@@ -317,7 +369,7 @@ object ConsumerGroupCommand extends Logging {
 
     def describeGroups(): Unit = {
       val groupIds =
-        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
         else opts.options.valuesOf(opts.groupOpt).asScala
       val membersOptPresent = opts.options.has(opts.membersOpt)
       val stateOptPresent = opts.options.has(opts.stateOpt)
@@ -381,7 +433,7 @@ object ConsumerGroupCommand extends Logging {
 
     def resetOffsets(): Map[String, Map[TopicPartition, OffsetAndMetadata]] = {
       val groupIds =
-        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
         else opts.options.valuesOf(opts.groupOpt).asScala
 
       val consumerGroups = adminClient.describeConsumerGroups(
@@ -869,7 +921,7 @@ object ConsumerGroupCommand extends Logging {
 
     def deleteGroups(): Map[String, Throwable] = {
       val groupIds =
-        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        if (opts.options.has(opts.allGroupsOpt)) listConsumerGroups()
         else opts.options.valuesOf(opts.groupOpt).asScala
 
       val groupsToDelete = adminClient.deleteConsumerGroups(
@@ -953,8 +1005,11 @@ object ConsumerGroupCommand extends Logging {
     val OffsetsDoc = "Describe the group and list all topic partitions in the group along with their offset lag. " +
       "This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + nl +
       "Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets"
-    val StateDoc = "Describe the group state. This option may be used with '--describe' and '--bootstrap-server' options only." + nl +
-      "Example: --bootstrap-server localhost:9092 --describe --group group1 --state"
+    val StateDoc = "When specified with '--describe', includes the state of the group." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + nl +
+      "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + nl +
+      "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + nl +
+      "This option may be used with '--describe', '--list' and '--bootstrap-server' options only."
     val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics."
 
     val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
@@ -1018,9 +1073,9 @@ object ConsumerGroupCommand extends Logging {
     val offsetsOpt = parser.accepts("offsets", OffsetsDoc)
                            .availableIf(describeOpt)
     val stateOpt = parser.accepts("state", StateDoc)
-                         .availableIf(describeOpt)
-
-    parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
+                         .availableIf(describeOpt, listOpt)
+                         .withOptionalArg()
+                         .ofType(classOf[String])
 
     options = parser.parse(args : _*)
 
@@ -1038,6 +1093,14 @@ object ConsumerGroupCommand extends Logging {
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
           CommandLineUtils.printUsageAndDie(parser,
             s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+        val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt)
+        if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) {
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $describeOpt takes at most one of these options: ${mutuallyExclusiveOpts.mkString(", ")}")
+        }
+        if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $describeOpt does not take a value for $stateOpt")
       } else {
         if (options.has(timeoutMsOpt))
           debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 0957c5b..00dd09b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -797,12 +797,17 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleListGroups(): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
     if (!isActive.get) {
       (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
       val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-      (errorCode, groupManager.currentGroups.map(_.overview).toList)
+      // if states is empty, return all groups
+      val groups = if (states.isEmpty)
+        groupManager.currentGroups
+      else
+        groupManager.currentGroups.filter(g => states.contains(g.summary.state))
+      (errorCode, groups.map(_.overview).toList)
     }
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 7e3b470..f82b663 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -159,7 +159,8 @@ private object GroupMetadata extends Logging {
  * Case class used to represent group metadata for the ListGroups API
  */
 case class GroupOverview(groupId: String,
-                         protocolType: String)
+                         protocolType: String,
+                         state: String)
 
 /**
  * Case class used to represent group metadata for the DescribeGroup API
@@ -562,7 +563,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   }
 
   def overview: GroupOverview = {
-    GroupOverview(groupId, protocolType.getOrElse(""))
+    GroupOverview(groupId, protocolType.getOrElse(""), state.toString)
   }
 
   def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dc9afcc..28e20cd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -86,6 +86,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.util.{Failure, Success, Try}
+import kafka.coordinator.group.GroupOverview
 
 
 /**
@@ -1394,29 +1395,35 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
-    val (error, groups) = groupCoordinator.handleListGroups()
+    val listGroupsRequest = request.body[ListGroupsRequest]
+    val states = if (listGroupsRequest.data.statesFilter == null)
+      // Handle a null array the same as empty
+      immutable.Set[String]()
+    else 
+      listGroupsRequest.data.statesFilter.asScala.toSet
+
+    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
+       new ListGroupsResponse(new ListGroupsResponseData()
+            .setErrorCode(error.code)
+            .setGroups(groups.map { group =>
+                val listedGroup = new ListGroupsResponseData.ListedGroup()
+                  .setGroupId(group.groupId)
+                  .setProtocolType(group.protocolType)
+                  .setGroupState(group.state.toString)
+                listedGroup
+            }.asJava)
+            .setThrottleTimeMs(throttleMs)
+        )
+    }
+    val (error, groups) = groupCoordinator.handleListGroups(states)
     if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
       // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group => new ListGroupsResponseData.ListedGroup()
-              .setGroupId(group.groupId)
-              .setProtocolType(group.protocolType)}.asJava
-            )
-            .setThrottleTimeMs(requestThrottleMs)
-        ))
+        createResponse(requestThrottleMs, groups, error))
     else {
       val filteredGroups = groups.filter(group => authorize(request.context, DESCRIBE, GROUP, group.groupId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(new ListGroupsResponseData()
-          .setErrorCode(error.code)
-          .setGroups(filteredGroups.map { group => new ListGroupsResponseData.ListedGroup()
-            .setGroupId(group.groupId)
-            .setProtocolType(group.protocolType)}.asJava
-          )
-          .setThrottleTimeMs(requestThrottleMs)
-        ))
+        createResponse(requestThrottleMs, filteredGroups, error))
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 4fbdf99..bab4452 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1085,10 +1085,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
           assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
           // Test that we can list the new group.
           TestUtils.waitUntilTrue(() => {
-            val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
-            matching.nonEmpty
+            val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
+                group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE)
+            matching.size == 1
           }, s"Expected to be able to list $testGroupId")
 
+          TestUtils.waitUntilTrue(() => {
+            val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
+            val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
+                group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.EMPTY).asJava)
+            val matching = client.listConsumerGroups(options).all.get.asScala.filter(
+                _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
           val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
             new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index ac7f56c..14fd5a5 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -176,7 +176,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     consumer.subscribe(List(topic).asJava)
 
     verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))
-    assertEquals(1, consumerGroupService.listGroups.size)
+    assertEquals(1, consumerGroupService.listConsumerGroups.size)
     consumerGroupService.close()
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 23de790..a3fa8f5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+      service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
     }, "The group did not initialize as expected.")
 
     executor.shutdown()
@@ -137,7 +137,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.listGroups().toSet == groups.keySet &&
+      service.listConsumerGroups().toSet == groups.keySet &&
         groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Stable")
     }, "The group did not initialize as expected.")
 
@@ -169,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+      service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
     }, "The group did not initialize as expected.")
 
     executor.shutdown()
@@ -194,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+      service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
     }, "The group did not initialize as expected.")
 
     executor.shutdown()
@@ -221,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable"
+      service.listConsumerGroups().contains(group) && service.collectGroupState(group).state == "Stable"
     }, "The group did not initialize as expected.")
 
     executor.shutdown()
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 560153f..2c28ccf 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -18,8 +18,7 @@ package kafka.admin
 
 import java.util.Properties
 
-import joptsimple.OptionException
-import kafka.utils.TestUtils
+import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
@@ -52,11 +51,46 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test(expected = classOf[OptionException])
+  @Test
   def testDescribeWithMultipleSubActions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    var exitStatus: Option[Int] = None
+    var exitMessage: Option[String] = None
+    Exit.setExitProcedure { (status, err) =>
+      exitStatus = Some(status)
+      exitMessage = err
+      throw new RuntimeException
+    }
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
-    getConsumerGroupService(cgcArgs)
+    try {
+      ConsumerGroupCommand.main(cgcArgs)
+    } catch {
+      case e: RuntimeException => //expected
+    } finally {
+      Exit.resetExitProcedure()
+    }
+    assertEquals(Some(1), exitStatus)
+    assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))
+  }
+
+  @Test
+  def testDescribeWithStateValue(): Unit = {
+    var exitStatus: Option[Int] = None
+    var exitMessage: Option[String] = None
+    Exit.setExitProcedure { (status, err) =>
+      exitStatus = Some(status)
+      exitMessage = err
+      throw new RuntimeException
+    }
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups", "--state", "Stable")
+    try {
+      ConsumerGroupCommand.main(cgcArgs)
+    } catch {
+      case e: RuntimeException => //expected
+    } finally {
+      Exit.resetExitProcedure()
+    }
+    assertEquals(Some(1), exitStatus)
+    assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]"))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 7429a43..3997c9e 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -17,8 +17,13 @@
 package kafka.admin
 
 import joptsimple.OptionException
+import org.junit.Assert._
 import org.junit.Test
 import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.clients.admin.ConsumerGroupListing
+import java.util.Optional
+import org.scalatest.Assertions.assertThrows
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
@@ -34,7 +39,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     val expectedGroups = Set(group, simpleGroup)
     var foundGroups = Set.empty[String]
     TestUtils.waitUntilTrue(() => {
-      foundGroups = service.listGroups().toSet
+      foundGroups = service.listConsumerGroups().toSet
       expectedGroups == foundGroups
     }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
   }
@@ -44,4 +49,88 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list")
     getConsumerGroupService(cgcArgs)
   }
+
+  @Test
+  def testListConsumerGroupsWithStates(): Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListing = Set(
+      new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)),
+      new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE)))
+
+    var foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      expectedListing == foundListing
+    }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+    val expectedListingStable = Set(
+      new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE)))
+
+    foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      expectedListingStable == foundListing
+    }, s"Expected to show groups $expectedListingStable, but found $foundListing")
+  }
+
+  @Test
+  def testConsumerGroupStatesFromString(): Unit = {
+    var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable")
+    assertEquals(Set(ConsumerGroupState.STABLE), result)
+
+    result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance")
+    assertEquals(Set(ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE), result)
+
+    result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,")
+    assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE), result)
+
+    assertThrows[IllegalArgumentException] {
+      ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong")
+    }
+
+    assertThrows[IllegalArgumentException] {
+      ConsumerGroupCommand.consumerGroupStatesFromString("stable")
+    }
+
+    assertThrows[IllegalArgumentException] {
+      ConsumerGroupCommand.consumerGroupStatesFromString("  bad, Stable")
+    }
+
+    assertThrows[IllegalArgumentException] {
+      ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,")
+    }
+  }
+
+  @Test
+  def testListGroupCommand(): Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    var out = ""
+
+    var cgcArgs = Array("--bootstrap-server", brokerList, "--list")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
+    }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
+    }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state", "Stable")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("STATE") && out.contains(group) && out.contains("Stable")
+    }, s"Expected to find $group in state Stable and the header, but found $out")
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3200f81..9791cd6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -174,7 +174,7 @@ class GroupCoordinatorTest {
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError)
 
     // ListGroups
-    val (listGroupsError, _) = groupCoordinator.handleListGroups()
+    val (listGroupsError, _) = groupCoordinator.handleListGroups(Set())
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
 
     // DeleteGroups
@@ -3208,10 +3208,10 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val (error, groups) = groupCoordinator.handleListGroups()
+    val (error, groups) = groupCoordinator.handleListGroups(Set())
     assertEquals(Errors.NONE, error)
     assertEquals(1, groups.size)
-    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+    assertEquals(GroupOverview("groupId", "consumer", Stable.toString), groups.head)
   }
 
   @Test
@@ -3220,10 +3220,56 @@ class GroupCoordinatorTest {
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     assertEquals(Errors.NONE, joinGroupResult.error)
 
-    val (error, groups) = groupCoordinator.handleListGroups()
+    val (error, groups) = groupCoordinator.handleListGroups(Set())
     assertEquals(Errors.NONE, error)
     assertEquals(1, groups.size)
-    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+    assertEquals(GroupOverview("groupId", "consumer", CompletingRebalance.toString), groups.head)
+  }
+
+  @Test
+  def testListGroupsWithStates(): Unit = {
+    val allStates = Set(PreparingRebalance, CompletingRebalance, Stable, Dead, Empty).map(s => s.toString)
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    // Member joins the group
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    // The group should be in CompletingRebalance
+    val (error, groups) = groupCoordinator.handleListGroups(Set(CompletingRebalance.toString))
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    val (error2, groups2) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == CompletingRebalance.toString))
+    assertEquals(Errors.NONE, error2)
+    assertEquals(0, groups2.size)
+
+    // Member syncs
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupResult.error)
+
+    // The group is now stable
+    val (error3, groups3) = groupCoordinator.handleListGroups(Set(Stable.toString))
+    assertEquals(Errors.NONE, error3)
+    assertEquals(1, groups3.size)
+    val (error4, groups4) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Stable.toString))
+    assertEquals(Errors.NONE, error4)
+    assertEquals(0, groups4.size)
+
+    // Member leaves
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+    verifyLeaveGroupResult(leaveGroupResults)
+
+    // The group is now empty
+    val (error5, groups5) = groupCoordinator.handleListGroups(Set(Empty.toString))
+    assertEquals(Errors.NONE, error5)
+    assertEquals(1, groups5.size)
+    val (error6, groups6) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Empty.toString))
+    assertEquals(Errors.NONE, error6)
+    assertEquals(0, groups6.size)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 82ca828..0b9c4a4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,6 +28,7 @@ import kafka.api.LeaderAndIsr
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
 import kafka.cluster.Partition
 import kafka.controller.KafkaController
+import kafka.coordinator.group.GroupOverview
 import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
 import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
 import kafka.coordinator.group.JoinGroupResult
@@ -1859,6 +1860,50 @@ class KafkaApisTest {
     EasyMock.verify(replicaManager)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable"),
+      GroupOverview("group2", "qwerty", "Empty")
+    )
+    val response = listGroupRequest(None, overviews)
+    assertEquals(2, response.data.groups.size)
+    assertEquals("Stable", response.data.groups.get(0).groupState)
+    assertEquals("Empty", response.data.groups.get(1).groupState)
+  }
+
+  @Test
+  def testListGroupsRequestWithState(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable")
+    )
+    val response = listGroupRequest(Some("Stable"), overviews)
+    assertEquals(1, response.data.groups.size)
+    assertEquals("Stable", response.data.groups.get(0).groupState)
+  }
+
+  private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = {
+    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+    val data = new ListGroupsRequestData()
+    if (state.isDefined)
+      data.setStatesFilter(Collections.singletonList(state.get))
+    val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
+    val requestChannelRequest = buildRequest(listGroupsRequest)
+
+    val capturedResponse = expectNoThrottling()
+    val expectedStates: Set[String] = if (state.isDefined) Set(state.get) else Set()
+    EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
+      .andReturn((Errors.NONE, overviews))
+    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleListGroupsRequest(requestChannelRequest)
+
+    val response = readResponse(ApiKeys.LIST_GROUPS, listGroupsRequest, capturedResponse).asInstanceOf[ListGroupsResponse]
+    assertEquals(Errors.NONE.code, response.data.errorCode)
+    response
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
    */