You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/03 23:40:25 UTC

[1/3] kafka git commit: KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

Repository: kafka
Updated Branches:
  refs/heads/trunk f413143ed -> 596c203af


http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
index 0a5bb3c..88eb9ae 100644
--- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
@@ -23,15 +23,18 @@ import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
 class MemberMetadataTest extends JUnitSuite {
+  val groupId = "groupId"
+  val clientId = "clientId"
+  val clientHost = "clientHost"
+  val memberId = "memberId"
+  val sessionTimeoutMs = 10000
+
 
   @Test
   def testMatchesSupportedProtocols {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val sessionTimeoutMs = 10000
     val protocols = List(("range", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     assertTrue(member.matches(protocols))
     assertFalse(member.matches(List(("range", Array[Byte](0)))))
     assertFalse(member.matches(List(("roundrobin", Array.empty[Byte]))))
@@ -40,48 +43,36 @@ class MemberMetadataTest extends JUnitSuite {
 
   @Test
   def testVoteForPreferredProtocol {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val sessionTimeoutMs = 10000
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     assertEquals("range", member.vote(Set("range", "roundrobin")))
     assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
   }
 
   @Test
   def testMetadata {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val sessionTimeoutMs = 10000
     val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
 
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range")))
     assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testMetadataRaisesOnUnsupportedProtocol {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val sessionTimeoutMs = 10000
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     member.metadata("blah")
     fail()
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testVoteRaisesOnNoSupportedProtocols {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val sessionTimeoutMs = 10000
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     member.vote(Set("blah"))
     fail()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 4e5e776..31f743b 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.api.{GroupMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
+import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.consumer.SimpleConsumer
 import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import kafka.utils._
@@ -56,7 +56,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     time = new MockTime()
     server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
     simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client")
-    val consumerMetadataRequest = GroupMetadataRequest(group)
+    val consumerMetadataRequest = GroupCoordinatorRequest(group)
     Stream.continually {
       val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
       consumerMetadataResponse.coordinatorOpt.isDefined


[3/3] kafka git commit: KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

Posted by gu...@apache.org.
KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang, Jun Rao

Closes #388 from hachikuji/K2687


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/596c203a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/596c203a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/596c203a

Branch: refs/heads/trunk
Commit: 596c203af1f33360c04f4be7c466310d11343f78
Parents: f413143
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Nov 3 14:46:04 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 3 14:46:04 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  34 +--
 .../consumer/internals/AbstractCoordinator.java |  20 +-
 .../consumer/internals/ConsumerCoordinator.java |   2 +-
 .../consumer/internals/ConsumerProtocol.java    |   2 +
 .../kafka/clients/producer/ProducerConfig.java  |  70 +++---
 .../apache/kafka/common/config/ConfigDef.java   |  17 ++
 .../apache/kafka/common/config/SaslConfigs.java |   8 +
 .../apache/kafka/common/config/SslConfigs.java  |  17 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   6 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  |  82 ++++++-
 .../kafka/common/protocol/types/Struct.java     |   8 +
 .../kafka/common/requests/AbstractRequest.java  |   8 +-
 .../common/requests/DescribeGroupsRequest.java  |  68 ++++++
 .../common/requests/DescribeGroupsResponse.java | 224 +++++++++++++++++
 .../requests/GroupCoordinatorRequest.java       |  65 +++++
 .../requests/GroupCoordinatorResponse.java      |  70 ++++++
 .../common/requests/GroupMetadataRequest.java   |  65 -----
 .../common/requests/GroupMetadataResponse.java  |  70 ------
 .../common/requests/ListGroupsRequest.java      |  57 +++++
 .../common/requests/ListGroupsResponse.java     | 107 ++++++++
 .../internals/ConsumerCoordinatorTest.java      |   4 +-
 .../common/requests/RequestResponseTest.java    |  45 +++-
 .../distributed/WorkerCoordinatorTest.java      |   4 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 242 +++++++++++++++++++
 .../kafka/api/GroupCoordinatorRequest.scala     |  80 ++++++
 .../kafka/api/GroupCoordinatorResponse.scala    |  58 +++++
 .../scala/kafka/api/GroupMetadataRequest.scala  |  80 ------
 .../scala/kafka/api/GroupMetadataResponse.scala |  58 -----
 core/src/main/scala/kafka/api/RequestKeys.scala |  10 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   4 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |   1 +
 .../scala/kafka/consumer/SimpleConsumer.scala   |   4 +-
 .../kafka/coordinator/GroupCoordinator.scala    | 104 ++++++--
 .../scala/kafka/coordinator/GroupMetadata.scala |  36 ++-
 .../coordinator/GroupMetadataManager.scala      |  38 +--
 .../kafka/coordinator/MemberMetadata.scala      |  24 +-
 .../javaapi/GroupCoordinatorResponse.scala      |  47 ++++
 .../kafka/javaapi/GroupMetadataResponse.scala   |  47 ----
 .../scala/kafka/network/RequestChannel.scala    |   3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  90 +++++--
 .../integration/kafka/api/AdminClientTest.scala | 114 +++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala   |   8 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   4 +-
 .../api/RequestResponseSerializationTest.scala  |  10 +-
 .../GroupCoordinatorResponseTest.scala          | 144 +++++++++--
 .../kafka/coordinator/GroupMetadataTest.scala   |  23 +-
 .../kafka/coordinator/MemberMetadataTest.scala  |  31 +--
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 49 files changed, 1765 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b366efd..4131352 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,8 +17,6 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.HashMap;
@@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
-                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         40 * 1000,
@@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.LONG,
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
-                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+
+                                // security support
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d8f3c25..9cf825c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,8 +30,8 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.GroupMetadataRequest;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorRequest;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -450,8 +450,8 @@ public abstract class AbstractCoordinator {
         } else {
             // create a group  metadata request
             log.debug("Issuing group metadata request to broker {}", node.id());
-            GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
-            return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
+            GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
+            return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                     .compose(new RequestFutureAdapter<ClientResponse, Void>() {
                         @Override
                         public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
@@ -472,14 +472,14 @@ public abstract class AbstractCoordinator {
             // We already found the coordinator, so ignore the request
             future.complete(null);
         } else {
-            GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
+            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
-                this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
-                        groupMetadataResponse.node().host(),
-                        groupMetadataResponse.node().port());
+            if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
+                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
+                        groupCoordinatorResponse.node().host(),
+                        groupCoordinatorResponse.node().port());
 
                 client.tryConnect(coordinator);
 
@@ -488,7 +488,7 @@ public abstract class AbstractCoordinator {
                     heartbeatTask.reset();
                 future.complete(null);
             } else {
-                future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
+                future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 97d25c3..5b5d6ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -119,7 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
 
     @Override
     public String protocolType() {
-        return "consumer";
+        return ConsumerProtocol.PROTOCOL_TYPE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 0020993..4728a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -54,6 +54,8 @@ import java.util.Map;
  */
 public class ConsumerProtocol {
 
+    public static final String PROTOCOL_TYPE = "consumer";
+
     public static final String VERSION_KEY_NAME = "version";
     public static final String TOPICS_KEY_NAME = "topics";
     public static final String TOPIC_KEY_NAME = "topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index e9d9fad..3eaea09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -12,23 +12,22 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
 /**
  * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
  * href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a>
@@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
-                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        VALUE_SERIALIZER_CLASS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
-                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
-                                .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        9 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(PARTITIONER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        DefaultPartitioner.class.getName(),
+                                        Importance.MEDIUM, PARTITIONER_CLASS_DOC)
+
+                                // security support
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 2e820dd..f01ed28 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -151,6 +151,23 @@ public class ConfigDef {
         return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
     }
 
+    /**
+     * Add standard SSL client configuration options.
+     * @return this
+     */
+    public ConfigDef withClientSslSupport() {
+        SslConfigs.addClientSslSupport(this);
+        return this;
+    }
+
+    /**
+     * Add standard SASL client configuration options.
+     * @return this
+     */
+    public ConfigDef withClientSaslSupport() {
+        SaslConfigs.addClientSaslSupport(this);
+        return this;
+    }
 
     /**
      * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 657c6d3..0046868 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -49,4 +49,12 @@ public class SaslConfigs {
             "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
     public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
 
+    public static void addClientSaslSupport(ConfigDef config) {
+        config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 60e1eb3..8f93301 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -96,4 +96,21 @@ public class SslConfigs {
                                            + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
                                            + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
 
+    public static void addClientSslSupport(ConfigDef config) {
+        config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index af7b266..5bd3c96 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,11 +30,13 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
     OFFSET_COMMIT(8, "OffsetCommit"),
     OFFSET_FETCH(9, "OffsetFetch"),
-    GROUP_METADATA(10, "GroupMetadata"),
+    GROUP_COORDINATOR(10, "GroupCoordinator"),
     JOIN_GROUP(11, "JoinGroup"),
     HEARTBEAT(12, "Heartbeat"),
     LEAVE_GROUP(13, "LeaveGroup"),
-    SYNC_GROUP(14, "SyncGroup");
+    SYNC_GROUP(14, "SyncGroup"),
+    DESCRIBE_GROUPS(15, "DescribeGroups"),
+    LIST_GROUPS(16, "ListGroups");
 
     private static ApiKeys[] codeToType;
     public static final int MAX_API_KEY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2c9cb20..d4eb1f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -86,7 +86,8 @@ public enum Errors {
             new ApiException("The session timeout is not within an acceptable range.")),
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
-    AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
+    AUTHORIZATION_FAILED(29,
+            new ApiException("Request is not authorized.")),
     REBALANCE_IN_PROGRESS(30,
             new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 36094b0..00560db 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -388,18 +388,71 @@ public class Protocol {
     public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
     public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
 
-    /* Consumer metadata api */
-    public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                STRING,
-                                                                                "The unique group id."));
+    /* List groups api */
+    public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
 
-    public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                       new Field("coordinator",
-                                                                                 BROKER,
-                                                                                 "Host and port information for the coordinator for a consumer group."));
+    public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
+                                                                          new Field("protocol_type", STRING));
+    public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                    new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
-    public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0};
-    public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0};
+    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
+    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+
+    /* Describe group api */
+    public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
+                                                                                 new ArrayOf(STRING),
+                                                                                 "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
+                                                                                         STRING,
+                                                                                         "The memberId assigned by the coordinator"),
+                                                                               new Field("client_id",
+                                                                                         STRING,
+                                                                                         "The client id used in the member's latest join group request"),
+                                                                               new Field("client_host",
+                                                                                         STRING,
+                                                                                         "The client host used in the request session corresponding to the member's join group."),
+                                                                               new Field("member_metadata",
+                                                                                         BYTES,
+                                                                                         "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."),
+                                                                               new Field("member_assignment",
+                                                                                         BYTES,
+                                                                                         "The current assignment provided by the group leader (will only be present if the group is stable)."));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16),
+                                                                                       new Field("group_id",
+                                                                                                 STRING),
+                                                                                       new Field("state",
+                                                                                                 STRING,
+                                                                                                 "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"),
+                                                                                       new Field("protocol_type",
+                                                                                                 STRING,
+                                                                                                 "The current group protocol type (will be empty if the there is no active group)"),
+                                                                                       new Field("protocol",
+                                                                                                 STRING,
+                                                                                                 "The current group protocol (only provided if the group is Stable)"),
+                                                                                       new Field("members",
+                                                                                                 new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0),
+                                                                                                 "Current group members (only provided if the group is not Dead)"));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+
+    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+
+    /* Group coordinator api */
+    public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                                   STRING,
+                                                                                   "The unique group id."));
+
+    public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                          new Field("coordinator",
+                                                                                    BROKER,
+                                                                                    "Host and port information for the coordinator for a consumer group."));
+
+    public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
+    public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -616,12 +669,13 @@ public class Protocol {
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST;
+        REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
         REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
         REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
-
+        REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
+        REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -633,11 +687,13 @@ public class Protocol {
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
         RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
         RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
+        RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ef2525e..54c3deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -98,6 +98,14 @@ public class Struct {
         return (Struct) get(name);
     }
 
+    public Byte getByte(Field field) {
+        return (Byte) get(field);
+    }
+
+    public byte getByte(String name) {
+        return (Byte) get(name);
+    }
+
     public Short getShort(Field field) {
         return (Short) get(field);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 03e77a5..8dfa3f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -49,8 +49,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return OffsetCommitRequest.parse(buffer, versionId);
             case OFFSET_FETCH:
                 return OffsetFetchRequest.parse(buffer, versionId);
-            case GROUP_METADATA:
-                return GroupMetadataRequest.parse(buffer, versionId);
+            case GROUP_COORDINATOR:
+                return GroupCoordinatorRequest.parse(buffer, versionId);
             case JOIN_GROUP:
                 return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
@@ -67,6 +67,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return UpdateMetadataRequest.parse(buffer, versionId);
             case LEADER_AND_ISR:
                 return LeaderAndIsrRequest.parse(buffer, versionId);
+            case DESCRIBE_GROUPS:
+                return DescribeGroupsRequest.parse(buffer, versionId);
+            case LIST_GROUPS:
+                return ListGroupsRequest.parse(buffer, versionId);
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
new file mode 100644
index 0000000..a545cca
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DescribeGroupsRequest extends AbstractRequest {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id);
+    private static final String GROUP_IDS_KEY_NAME = "group_ids";
+
+    private final List<String> groupIds;
+
+    public DescribeGroupsRequest(List<String> groupIds) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
+        this.groupIds = groupIds;
+    }
+
+    public DescribeGroupsRequest(Struct struct) {
+        super(struct);
+        this.groupIds = new ArrayList<>();
+        for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
+            this.groupIds.add((String) groupId);
+    }
+
+    public List<String> groupIds() {
+        return groupIds;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id)));
+        }
+    }
+
+    public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) {
+        return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer));
+    }
+
+    public static DescribeGroupsRequest parse(ByteBuffer buffer) {
+        return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
new file mode 100644
index 0000000..c71e7d2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeGroupsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
+
+    private static final String GROUPS_KEY_NAME = "groups";
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GROUP_STATE_KEY_NAME = "state";
+    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+    private static final String PROTOCOL_KEY_NAME = "protocol";
+
+    private static final String MEMBERS_KEY_NAME = "members";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String CLIENT_ID_KEY_NAME = "client_id";
+    private static final String CLIENT_HOST_KEY_NAME = "client_host";
+    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+    public static final String UNKNOWN_STATE = "";
+    public static final String UNKNOWN_PROTOCOL_TYPE = "";
+    public static final String UNKNOWN_PROTOCOL = "";
+
+    /**
+     * Possible per-group error codes:
+     *
+     * GROUP_LOAD_IN_PROGRESS (14)
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
+     * AUTHORIZATION_FAILED (29)
+     */
+
+    private final Map<String, GroupMetadata> groups;
+
+    public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> groupStructs = new ArrayList<>();
+        for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            GroupMetadata group = groupEntry.getValue();
+            groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
+            groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode);
+            groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
+            List<Struct> membersList = new ArrayList<>();
+            for (GroupMember member : group.members) {
+                Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
+                memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
+                memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
+                memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
+                memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
+                memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
+                membersList.add(memberStruct);
+            }
+            groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
+            groupStructs.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
+        this.groups = groups;
+    }
+
+    public DescribeGroupsResponse(Struct struct) {
+        super(struct);
+        this.groups = new HashMap<>();
+        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+            Struct groupStruct = (Struct) groupObj;
+
+            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME);
+            String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
+            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+            String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
+
+            List<GroupMember> members = new ArrayList<>();
+            for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
+                Struct memberStruct = (Struct) memberObj;
+                String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME);
+                String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
+                String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
+                ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
+                ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+                members.add(new GroupMember(memberId, clientId, clientHost,
+                        memberMetadata, memberAssignment));
+            }
+            this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members));
+        }
+    }
+
+    public Map<String, GroupMetadata> groups() {
+        return groups;
+    }
+
+
+    public static class GroupMetadata {
+        private final short errorCode;
+        private final String state;
+        private final String protocolType;
+        private final String protocol;
+        private final List<GroupMember> members;
+
+        public GroupMetadata(short errorCode,
+                             String state,
+                             String protocolType,
+                             String protocol,
+                             List<GroupMember> members) {
+            this.errorCode = errorCode;
+            this.state = state;
+            this.protocolType = protocolType;
+            this.protocol = protocol;
+            this.members = members;
+        }
+
+        public short errorCode() {
+            return errorCode;
+        }
+
+        public String state() {
+            return state;
+        }
+
+        public String protocolType() {
+            return protocolType;
+        }
+
+        public String protocol() {
+            return protocol;
+        }
+
+        public List<GroupMember> members() {
+            return members;
+        }
+
+        public static GroupMetadata forError(Errors error) {
+            return new DescribeGroupsResponse.GroupMetadata(
+                    error.code(),
+                    DescribeGroupsResponse.UNKNOWN_STATE,
+                    DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+                    DescribeGroupsResponse.UNKNOWN_PROTOCOL,
+                    Collections.<DescribeGroupsResponse.GroupMember>emptyList());
+        }
+    }
+
+    public static class GroupMember {
+        private final String memberId;
+        private final String clientId;
+        private final String clientHost;
+        private final ByteBuffer memberMetadata;
+        private final ByteBuffer memberAssignment;
+
+        public GroupMember(String memberId,
+                           String clientId,
+                           String clientHost,
+                           ByteBuffer memberMetadata,
+                           ByteBuffer memberAssignment) {
+            this.memberId = memberId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.memberMetadata = memberMetadata;
+            this.memberAssignment = memberAssignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public ByteBuffer memberMetadata() {
+            return memberMetadata;
+        }
+
+        public ByteBuffer memberAssignment() {
+            return memberAssignment;
+        }
+    }
+
+    public static DescribeGroupsResponse parse(ByteBuffer buffer) {
+        return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+        GroupMetadata errorMetadata = GroupMetadata.forError(error);
+        Map<String, GroupMetadata> groups = new HashMap<>();
+        for (String groupId : groupIds)
+            groups.put(groupId, errorMetadata);
+        return new DescribeGroupsResponse(groups);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
new file mode 100644
index 0000000..8c56e7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+
+    private final String groupId;
+
+    public GroupCoordinatorRequest(String groupId) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        this.groupId = groupId;
+    }
+
+    public GroupCoordinatorRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) {
+        return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer));
+    }
+
+    public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
+        return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
new file mode 100644
index 0000000..c28de70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private final Node node;
+
+    public GroupCoordinatorResponse(short errorCode, Node node) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        this.errorCode = errorCode;
+        this.node = node;
+    }
+
+    public GroupCoordinatorResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public static GroupCoordinatorResponse parse(ByteBuffer buffer) {
+        return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
deleted file mode 100644
index fd54c5a..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    private final String groupId;
-
-    public GroupMetadataRequest(String groupId) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        this.groupId = groupId;
-    }
-
-    public GroupMetadataRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer));
-    }
-
-    public static GroupMetadataRequest parse(ByteBuffer buffer) {
-        return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
deleted file mode 100644
index a5ef478..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final short errorCode;
-    private final Node node;
-
-    public GroupMetadataResponse(short errorCode, Node node) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        this.errorCode = errorCode;
-        this.node = node;
-    }
-
-    public GroupMetadataResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    public static GroupMetadataResponse parse(ByteBuffer buffer) {
-        return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
new file mode 100644
index 0000000..439720f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ListGroupsRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id);
+
+    public ListGroupsRequest() {
+        super(new Struct(CURRENT_SCHEMA));
+    }
+
+    public ListGroupsRequest(Struct struct) {
+        super(struct);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                short errorCode = Errors.forException(e).code();
+                return new ListGroupsResponse(errorCode, Collections.<ListGroupsResponse.Group>emptyList());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id)));
+        }
+    }
+
+    public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) {
+        return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer));
+    }
+
+    public static ListGroupsRequest parse(ByteBuffer buffer) {
+        return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
new file mode 100644
index 0000000..d07f0d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ListGroupsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
+
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+    public static final String GROUPS_KEY_NAME = "groups";
+    public static final String GROUP_ID_KEY_NAME = "group_id";
+    public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+
+    /**
+     * Possible error codes:
+     *
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * AUTHORIZATION_FAILED (29)
+     */
+
+    private final short errorCode;
+    private final List<Group> groups;
+
+    public ListGroupsResponse(short errorCode, List<Group> groups) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        List<Struct> groupList = new ArrayList<>();
+        for (Group group : groups) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupList.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupList.toArray());
+        this.errorCode = errorCode;
+        this.groups = groups;
+    }
+
+    public ListGroupsResponse(Struct struct) {
+        super(struct);
+        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.groups = new ArrayList<>();
+        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+            Struct groupStruct = (Struct) groupObj;
+            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+            this.groups.add(new Group(groupId, protocolType));
+        }
+    }
+
+    public List<Group> groups() {
+        return groups;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static class Group {
+        private final String groupId;
+        private final String protocolType;
+
+        public Group(String groupId, String protocolType) {
+            this.groupId = groupId;
+            this.protocolType = protocolType;
+        }
+
+        public String groupId() {
+            return groupId;
+        }
+
+        public String protocolType() {
+            return protocolType;
+        }
+
+    }
+
+    public static ListGroupsResponse parse(ByteBuffer buffer) {
+        return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static ListGroupsResponse fromError(Errors error) {
+        return new ListGroupsResponse(error.code(), Collections.<Group>emptyList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2029e92..8667f22 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -729,7 +729,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {
-        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index fb21802..761b9db 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,9 +44,9 @@ public class RequestResponseTest {
         List<AbstractRequestResponse> requestResponseList = Arrays.asList(
                 createRequestHeader(),
                 createResponseHeader(),
-                createConsumerMetadataRequest(),
-                createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
-                createConsumerMetadataResponse(),
+                createGroupCoordinatorRequest(),
+                createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()),
+                createGroupCoordinatorResponse(),
                 createControlledShutdownRequest(),
                 createControlledShutdownResponse(),
                 createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
@@ -61,6 +62,12 @@ public class RequestResponseTest {
                 createLeaveGroupRequest(),
                 createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
                 createLeaveGroupResponse(),
+                createListGroupsRequest(),
+                createListGroupsRequest().getErrorResponse(0, new UnknownServerException()),
+                createListGroupsResponse(),
+                createDescribeGroupRequest(),
+                createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()),
+                createDescribeGroupResponse(),
                 createListOffsetRequest(),
                 createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
                 createListOffsetResponse(),
@@ -150,12 +157,12 @@ public class RequestResponseTest {
         return new ResponseHeader(10);
     }
 
-    private AbstractRequest createConsumerMetadataRequest() {
-        return new GroupMetadataRequest("test-group");
+    private AbstractRequest createGroupCoordinatorRequest() {
+        return new GroupCoordinatorRequest("test-group");
     }
 
-    private AbstractRequestResponse createConsumerMetadataResponse() {
-        return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
+    private AbstractRequestResponse createGroupCoordinatorResponse() {
+        return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
     }
 
     private AbstractRequest createFetchRequest() {
@@ -193,6 +200,30 @@ public class RequestResponseTest {
         return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
     }
 
+    private AbstractRequest createListGroupsRequest() {
+        return new ListGroupsRequest();
+    }
+
+    private AbstractRequestResponse createListGroupsResponse() {
+        List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
+        return new ListGroupsResponse(Errors.NONE.code(), groups);
+    }
+
+    private AbstractRequest createDescribeGroupRequest() {
+        return new DescribeGroupsRequest(Collections.singletonList("test-group"));
+    }
+
+    private AbstractRequestResponse createDescribeGroupResponse() {
+        String clientId = "consumer-1";
+        String clientHost = "localhost";
+        ByteBuffer empty = ByteBuffer.allocate(0);
+        DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
+                clientId, clientHost, empty, empty);
+        DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(),
+                "STABLE", "consumer", "roundrobin", Arrays.asList(member));
+        return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
+    }
+
     private AbstractRequest createLeaveGroupRequest() {
         return new LeaveGroupRequest("group1", "consumer1");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index ca53674..ac9df44 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -386,7 +386,7 @@ public class WorkerCoordinatorTest {
 
 
     private Struct groupMetadataResponse(Node node, short error) {
-        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }
 


[2/3] kafka git commit: KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
new file mode 100644
index 0000000..ddd3114
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.admin
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import kafka.common.KafkaException
+import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.errors.DisconnectException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.Selector
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.{TopicPartition, Cluster, Node}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class AdminClient(val time: Time,
+                  val requestTimeoutMs: Int,
+                  val client: ConsumerNetworkClient,
+                  val bootstrapBrokers: List[Node]) extends Logging {
+
+  private def send(target: Node,
+                   api: ApiKeys,
+                   request: AbstractRequest): Struct = {
+    var now = time.milliseconds()
+    val deadline = now + requestTimeoutMs
+    var future: RequestFuture[ClientResponse] = null
+
+    do {
+      future = client.send(target, api, request)
+      client.poll(future)
+
+      if (future.succeeded())
+        return if (future.value().wasDisconnected()) {
+          throw new DisconnectException()
+        } else {
+          future.value().responseBody()
+        }
+
+      now = time.milliseconds()
+    } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
+
+    throw future.exception()
+  }
+
+  private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
+    bootstrapBrokers.foreach {
+      case broker =>
+        try {
+          return send(broker, api, request)
+        } catch {
+          case e: Exception =>
+            debug(s"Request ${api} failed against node ${broker}", e)
+        }
+    }
+    throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+  }
+
+  private def findCoordinator(groupId: String): Node = {
+    val request = new GroupCoordinatorRequest(groupId)
+    val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
+    val response = new GroupCoordinatorResponse(responseBody)
+    Errors.forCode(response.errorCode()).maybeThrow()
+    response.node()
+  }
+
+  def listGroups(node: Node): List[GroupOverview] = {
+    val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
+    val response = new ListGroupsResponse(responseBody)
+    Errors.forCode(response.errorCode()).maybeThrow()
+    response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+  }
+
+  private def findAllBrokers(): List[Node] = {
+    val request = new MetadataRequest(List[String]())
+    val responseBody = sendAnyNode(ApiKeys.METADATA, request)
+    val response = new MetadataResponse(responseBody)
+    if (!response.errors().isEmpty)
+      debug(s"Metadata request contained errors: ${response.errors()}")
+    response.cluster().nodes().asScala.toList
+  }
+
+  def listAllGroups(): Map[Node, List[GroupOverview]] = {
+    findAllBrokers.map {
+      case broker =>
+        broker -> {
+          try {
+            listGroups(broker)
+          } catch {
+            case e: Exception =>
+              debug(s"Failed to find groups from broker ${broker}", e)
+              List[GroupOverview]()
+          }
+        }
+    }.toMap
+  }
+
+  def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
+    listAllGroups().mapValues { groups =>
+      groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+    }
+  }
+
+  def listAllGroupsFlattened(): List[GroupOverview] = {
+    listAllGroups.values.flatten.toList
+  }
+
+  def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
+    listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+  }
+
+  def describeGroup(groupId: String): GroupSummary = {
+    val coordinator = findCoordinator(groupId)
+    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+    val response = new DescribeGroupsResponse(responseBody)
+    val metadata = response.groups().get(groupId)
+    if (metadata == null)
+      throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+
+    Errors.forCode(metadata.errorCode()).maybeThrow()
+    val members = metadata.members().map {
+      case member =>
+        val metadata = Utils.readBytes(member.memberMetadata())
+        val assignment = Utils.readBytes(member.memberAssignment())
+        MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
+    }.toList
+    GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
+  }
+
+  def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
+    val group = describeGroup(groupId)
+    if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
+      throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
+
+    group.members.map {
+      case member =>
+        val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+        member.memberId -> assignment.partitions().asScala.toList
+    }.toMap
+  }
+
+}
+
+object AdminClient {
+  val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
+  val DefaultRequestTimeoutMs = 5000
+  val DefaultMaxInFlightRequestsPerConnection = 100
+  val DefaultReconnectBackoffMs = 50
+  val DefaultSendBufferBytes = 128 * 1024
+  val DefaultReceiveBufferBytes = 32 * 1024
+  val DefaultRetryBackoffMs = 100
+  val AdminClientIdSequence = new AtomicInteger(1)
+  val AdminConfigDef = {
+    val config = new ConfigDef()
+      .define(
+        CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+        Type.LIST,
+        Importance.HIGH,
+        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+      .define(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+        ConfigDef.Type.STRING,
+        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+      .withClientSslSupport()
+      .withClientSaslSupport()
+    config
+  }
+
+  class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+
+  def createSimplePlaintext(brokerUrl: String): AdminClient = {
+    val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
+    create(new AdminConfig(config))
+  }
+
+  def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
+
+  def create(config: AdminConfig): AdminClient = {
+    val time = new SystemTime
+    val metrics = new Metrics(time)
+    val metadata = new Metadata
+    val channelBuilder = ClientUtils.createChannelBuilder(config.values())
+
+    val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+    val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
+    metadata.update(bootstrapCluster, 0)
+
+    val selector = new Selector(
+      DefaultConnectionMaxIdleMs,
+      metrics,
+      time,
+      "admin",
+      Map[String, String](),
+      channelBuilder)
+
+    val networkClient = new NetworkClient(
+      selector,
+      metadata,
+      "admin-" + AdminClientIdSequence.getAndIncrement(),
+      DefaultMaxInFlightRequestsPerConnection,
+      DefaultReconnectBackoffMs,
+      DefaultSendBufferBytes,
+      DefaultReceiveBufferBytes,
+      DefaultRequestTimeoutMs,
+      time)
+
+    val highLevelClient = new ConsumerNetworkClient(
+      networkClient,
+      metadata,
+      time,
+      DefaultRetryBackoffMs)
+
+    new AdminClient(
+      time,
+      DefaultRequestTimeoutMs,
+      highLevelClient,
+      bootstrapCluster.nodes().asScala.toList)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
new file mode 100644
index 0000000..43e78f5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupCoordinatorRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    // envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+
+    // request
+    val group = ApiUtils.readShortString(buffer)
+    GroupCoordinatorRequest(group, versionId, correlationId, clientId)
+  }
+
+}
+
+case class GroupCoordinatorRequest(group: String,
+                                   versionId: Short = GroupCoordinatorRequest.CurrentVersion,
+                                   correlationId: Int = 0,
+                                   clientId: String = GroupCoordinatorRequest.DefaultClientId)
+  extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    ApiUtils.shortStringLength(clientId) +
+    ApiUtils.shortStringLength(group)
+
+  def writeTo(buffer: ByteBuffer) {
+    // envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    ApiUtils.writeShortString(buffer, clientId)
+
+    // consumer metadata request
+    ApiUtils.writeShortString(buffer, group)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    // return ConsumerCoordinatorNotAvailable for all uncaught errors
+    val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val consumerMetadataRequest = new StringBuilder
+    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    consumerMetadataRequest.append("; Version: " + versionId)
+    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+    consumerMetadataRequest.append("; ClientId: " + clientId)
+    consumerMetadataRequest.append("; Group: " + group)
+    consumerMetadataRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..4cd7db8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupCoordinatorResponse {
+  val CurrentVersion = 0
+
+  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+  
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val broker = BrokerEndPoint.readFrom(buffer)
+    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+      Some(broker)
+    else
+      None
+
+    GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
+  }
+  
+}
+
+case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+  extends RequestOrResponse() {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+    2 + /* error code */
+    coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+  }
+
+  def describe(details: Boolean) = toString
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
deleted file mode 100644
index 075ddb5..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object GroupMetadataRequest {
-  val CurrentVersion = 0.shortValue
-  val DefaultClientId = ""
-
-  def readFrom(buffer: ByteBuffer) = {
-    // envelope
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = ApiUtils.readShortString(buffer)
-
-    // request
-    val group = ApiUtils.readShortString(buffer)
-    GroupMetadataRequest(group, versionId, correlationId, clientId)
-  }
-
-}
-
-case class GroupMetadataRequest(group: String,
-                                versionId: Short = GroupMetadataRequest.CurrentVersion,
-                                correlationId: Int = 0,
-                                clientId: String = GroupMetadataRequest.DefaultClientId)
-  extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
-
-  def sizeInBytes =
-    2 + /* versionId */
-    4 + /* correlationId */
-    ApiUtils.shortStringLength(clientId) +
-    ApiUtils.shortStringLength(group)
-
-  def writeTo(buffer: ByteBuffer) {
-    // envelope
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    ApiUtils.writeShortString(buffer, clientId)
-
-    // consumer metadata request
-    ApiUtils.writeShortString(buffer, group)
-  }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  def describe(details: Boolean) = {
-    val consumerMetadataRequest = new StringBuilder
-    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
-    consumerMetadataRequest.append("; Version: " + versionId)
-    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
-    consumerMetadataRequest.append("; ClientId: " + clientId)
-    consumerMetadataRequest.append("; Group: " + group)
-    consumerMetadataRequest.toString()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
deleted file mode 100644
index 2d65917..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object GroupMetadataResponse {
-  val CurrentVersion = 0
-
-  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-  
-  def readFrom(buffer: ByteBuffer) = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val broker = BrokerEndPoint.readFrom(buffer)
-    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
-      Some(broker)
-    else
-      None
-
-    GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
-  }
-  
-}
-
-case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
-  extends RequestOrResponse() {
-
-  def sizeInBytes =
-    4 + /* correlationId */
-    2 + /* error code */
-    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
-  }
-
-  def describe(details: Boolean) = toString
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 669b63a..2363099 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,12 +33,17 @@ object RequestKeys {
   val ControlledShutdownKey: Short = 7
   val OffsetCommitKey: Short = 8
   val OffsetFetchKey: Short = 9
-  val GroupMetadataKey: Short = 10
+  val GroupCoordinatorKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
   val LeaveGroupKey: Short = 13
   val SyncGroupKey: Short = 14
+  val DescribeGroupsKey: Short = 15
+  val ListGroupsKey: Short = 16
 
+  // NOTE: this map only includes the server-side request/response handlers. Newer
+  // request types should only use the client-side versions which are parsed with
+  // o.a.k.common.requests.AbstractRequest.getRequest()
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
         FetchKey -> ("Fetch", FetchRequest.readFrom),
@@ -49,8 +54,7 @@ object RequestKeys {
         UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
         ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
+        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 36b5b3b..2f836c0 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
            if (!queryChannel.isConnected)
              queryChannel = channelToAnyBroker(zkUtils)
            debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
-           queryChannel.send(GroupMetadataRequest(group))
+           queryChannel.send(GroupCoordinatorRequest(group))
            val response = queryChannel.receive()
-           val consumerMetadataResponse =  GroupMetadataResponse.readFrom(response.payload())
+           val consumerMetadataResponse =  GroupCoordinatorResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 81cb51b..6f53fac 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -60,6 +60,7 @@ object ErrorMapping {
   // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
   // 28: INVALID_COMMIT_OFFSET_SIZE
   val AuthorizationCode: Short = 29
+  // 30: REBALANCE_IN_PROGRESS
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5b1aead..e15aca4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
     TopicMetadataResponse.readFrom(response.payload())
   }
 
-  def send(request: GroupMetadataRequest): GroupMetadataResponse = {
+  def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = {
     val response = sendRequest(request)
-    GroupMetadataResponse.readFrom(response.payload())
+    GroupCoordinatorResponse.readFrom(response.payload())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 97ce22b..2015371 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -107,6 +107,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleJoinGroup(groupId: String,
                       memberId: String,
                       clientId: String,
+                      clientHost: String,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
@@ -132,10 +133,10 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
         } else {
           group = groupManager.addGroup(groupId, protocolType)
-          doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
         }
       } else {
-        doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+        doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
@@ -143,6 +144,7 @@ class GroupCoordinator(val brokerId: Int,
   private def doJoinGroup(group: GroupMetadata,
                           memberId: String,
                           clientId: String,
+                          clientHost: String,
                           sessionTimeoutMs: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
@@ -166,7 +168,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -174,7 +176,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -201,7 +203,7 @@ class GroupCoordinator(val brokerId: Int,
           case Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -269,13 +271,30 @@ class GroupCoordinator(val brokerId: Int,
             group.get(memberId).awaitingSyncCallback = responseCallback
             completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
 
-            // if this is the leader, then we can transition to stable and
-            // propagate the assignment to any awaiting members
+            // if this is the leader, then we can attempt to persist state and transition to stable
             if (memberId == group.leaderId) {
-              group.transitionTo(Stable)
 
-              // persist the group metadata and upon finish propagate the assignment
-              groupManager.storeGroup(group, groupAssignment)
+              // fill any missing members with an empty assignment
+              val missing = group.allMembers -- groupAssignment.keySet
+              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+              // persist the group metadata and upon finish transition to stable and propagate the assignment
+              groupManager.storeGroup(group, assignment, (errorCode: Short) => {
+                group synchronized {
+                  // another member may have joined the group while we were awaiting this callback,
+                  // so we must ensure we are still in the AwaitingSync state when it gets invoked.
+                  // if we have transitioned to another state, then we shouldn't do anything
+                  if (group.is(AwaitingSync)) {
+                    if (errorCode != Errors.NONE.code) {
+                      resetAndPropagateAssignmentError(group, errorCode)
+                      maybePrepareRebalance(group)
+                    } else if (group.is(AwaitingSync)) {
+                      setAndPropagateAssignment(group, assignment)
+                      group.transitionTo(Stable)
+                    }
+                  }
+                }
+              })
             }
 
           case Stable =>
@@ -413,6 +432,34 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  def handleListGroups(): (Errors, List[GroupOverview]) = {
+    if (!isActive.get) {
+      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+    } else {
+      val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+      (errorCode, groupManager.currentGroups.map(_.overview).toList)
+    }
+  }
+
+  def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+    if (!isActive.get) {
+      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+    } else {
+      val group = groupManager.getGroup(groupId)
+      if (group == null) {
+        (Errors.NONE, GroupCoordinator.DeadGroup)
+      } else {
+        group synchronized {
+          (Errors.NONE, group.summary)
+        }
+      }
+    }
+  }
+
   def handleGroupImmigration(offsetTopicPartitionId: Int) = {
     groupManager.loadGroupsForPartition(offsetTopicPartitionId)
   }
@@ -421,6 +468,27 @@ class GroupCoordinator(val brokerId: Int,
     groupManager.removeGroupsForPartition(offsetTopicPartitionId)
   }
 
+  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+    propagateAssignment(group, Errors.NONE.code)
+  }
+
+  private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+    propagateAssignment(group, errorCode)
+  }
+
+  private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+    for (member <- group.allMemberMetadata) {
+      if (member.awaitingSyncCallback != null) {
+        member.awaitingSyncCallback(member.assignment, errorCode)
+        member.awaitingSyncCallback = null
+      }
+    }
+  }
+
   private def validGroupId(groupId: String): Boolean = {
     groupId != null && !groupId.isEmpty
   }
@@ -458,12 +526,13 @@ class GroupCoordinator(val brokerId: Int,
 
   private def addMemberAndRebalance(sessionTimeoutMs: Int,
                                     clientId: String,
+                                    clientHost: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) = {
     // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
     maybePrepareRebalance(group)
@@ -488,11 +557,9 @@ class GroupCoordinator(val brokerId: Int,
 
   private def prepareRebalance(group: GroupMetadata) {
     // if any members are awaiting sync, cancel their request and have them rejoin
-    if (group.is(AwaitingSync)) {
-      groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code)
-    }
+    if (group.is(AwaitingSync))
+      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
 
-    group.allMembers.foreach(_.assignment = null)
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
 
@@ -544,7 +611,7 @@ class GroupCoordinator(val brokerId: Int,
         info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
 
         // trigger the awaiting join group response callback for all the members after rebalancing
-        for (member <- group.allMembers) {
+        for (member <- group.allMemberMetadata) {
           assert(member.awaitingJoinCallback != null)
           val joinResult = JoinGroupResult(
             members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
@@ -595,6 +662,11 @@ class GroupCoordinator(val brokerId: Int,
 
 object GroupCoordinator {
 
+  val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+  val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+  val NoMembers = List[MemberSummary]()
+  val NoState = ""
+  val NoProtocolType = ""
   val NoProtocol = ""
   val NoLeader = ""
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 652a3a4..ece9ce0 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -93,6 +93,20 @@ private object GroupMetadata {
 }
 
 /**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+                         protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+                        protocolType: String,
+                        protocol: String,
+                        members: List[MemberSummary])
+
+/**
  * Group contains the following metadata:
  *
  *  Membership metadata:
@@ -144,7 +158,9 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
 
-  def allMembers = members.values.toList
+  def allMembers = members.keySet
+
+  def allMemberMetadata = members.values.toList
 
   def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
     timeout.max(member.sessionTimeoutMs)
@@ -168,7 +184,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
     val candidates = candidateProtocols
 
     // let each member vote for one of the protocols and choose the one with the most votes
-    val votes: List[(String, Int)] = allMembers
+    val votes: List[(String, Int)] = allMemberMetadata
       .map(_.vote(candidates))
       .groupBy(identity)
       .mapValues(_.size)
@@ -179,7 +195,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   private def candidateProtocols = {
     // get the set of protocols that are commonly supported by all members
-    allMembers
+    allMemberMetadata
       .map(_.protocols)
       .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
   }
@@ -201,6 +217,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
     members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
   }
 
+  def summary: GroupSummary = {
+    if (is(Stable)) {
+      val members = this.members.values.map{ member => member.summary(protocol) }.toList
+      GroupSummary(state.toString, protocolType, protocol, members)
+    } else {
+      val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+      GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+    }
+  }
+
+  def overview: GroupOverview = {
+    GroupOverview(groupId, protocolType)
+  }
+
   private def assertValidTransition(targetState: GroupState) {
     if (!GroupMetadata.validPreviousStates(targetState).contains(state))
       throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 81ed548..0052b5d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -95,12 +95,16 @@ class GroupMetadataManager(val brokerId: Int,
     }
   )
 
+  def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+
   def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
 
   def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
 
   def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
 
+  def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty
+
   /**
    * Get the group associated with the given groupId, or null if not found
    */
@@ -158,7 +162,8 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   def storeGroup(group: GroupMetadata,
-                 groupAssignment: Map[String, Array[Byte]]) {
+                 groupAssignment: Map[String, Array[Byte]],
+                 responseCallback: Short => Unit) {
     // construct the message to append
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
@@ -208,12 +213,7 @@ class GroupMetadataManager(val brokerId: Int,
         }
       }
 
-      for (member <- group.allMembers) {
-        member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte])
-      }
-
-      // propagate the assignments
-      propagateAssignment(group, responseCode)
+      responseCallback(responseCode)
     }
 
     // call replica manager to append the group message
@@ -225,16 +225,7 @@ class GroupMetadataManager(val brokerId: Int,
       putCacheCallback)
   }
 
-  def propagateAssignment(group: GroupMetadata,
-                          errorCode: Short) {
-    val hasError = errorCode != Errors.NONE.code
-    for (member <- group.allMembers) {
-      if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode)
-        member.awaitingSyncCallback = null
-      }
-    }
-  }
+
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
@@ -657,10 +648,14 @@ object GroupMetadataManager {
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
   private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
+    new Field("client_id", STRING),
+    new Field("client_host", STRING),
     new Field("session_timeout", INT32),
     new Field("subscription", BYTES),
     new Field("assignment", BYTES))
   private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
+  private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
+  private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
   private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
   private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
   private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
@@ -787,10 +782,12 @@ object GroupMetadataManager {
     value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
     value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
 
-    val memberArray = groupMetadata.allMembers.map {
+    val memberArray = groupMetadata.allMemberMetadata.map {
       case memberMetadata =>
         val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
         memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
+        memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
+        memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost)
         memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
 
         val metadata = memberMetadata.metadata(groupMetadata.protocol)
@@ -901,10 +898,13 @@ object GroupMetadataManager {
           case memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
             val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
+            val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
+            val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
             val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
             val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
 
-            val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription)))
+            val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+              List((group.protocol, subscription)))
 
             member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 6a76241..80782c8 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -23,6 +23,13 @@ import kafka.utils.nonthreadsafe
 
 import scala.collection.Map
 
+
+case class MemberSummary(memberId: String,
+                         clientId: String,
+                         clientHost: String,
+                         metadata: Array[Byte],
+                         assignment: Array[Byte])
+
 /**
  * Member metadata contains the following metadata:
  *
@@ -46,15 +53,14 @@ import scala.collection.Map
 @nonthreadsafe
 private[coordinator] class MemberMetadata(val memberId: String,
                                           val groupId: String,
+                                          val clientId: String,
+                                          val clientHost: String,
                                           val sessionTimeoutMs: Int,
                                           var supportedProtocols: List[(String, Array[Byte])]) {
 
-  // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback
-  // since they can be accessed in the append callback thread that does not
-  // hold on the group object lock
-  @volatile var assignment: Array[Byte] = null
+  var assignment: Array[Byte] = Array.empty[Byte]
   var awaitingJoinCallback: JoinGroupResult => Unit = null
-  @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
 
@@ -87,6 +93,14 @@ private[coordinator] class MemberMetadata(val memberId: String,
     return true
   }
 
+  def summary(protocol: String): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+  }
+
+  def summaryNoMetadata(): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+  }
+
   /**
    * Vote for one of the potential group protocols. This takes into account the protocol preference as
    * indicated by the order of supported protocols and returns the first one also contained in the set

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..0e14758
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) {
+
+  def errorCode = underlying.errorCode
+
+  def coordinator: BrokerEndPoint = {
+    import kafka.javaapi.Implicits._
+    underlying.coordinatorOpt
+  }
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+    this.underlying.equals(otherConsumerMetadataResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+
+  override def hashCode = underlying.hashCode
+
+  override def toString = underlying.toString
+
+}
+
+object GroupCoordinatorResponse {
+  def readFrom(buffer: ByteBuffer) = new GroupCoordinatorResponse(kafka.api.GroupCoordinatorResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
deleted file mode 100644
index b94aa01..0000000
--- a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
-
-  def errorCode = underlying.errorCode
-
-  def coordinator: BrokerEndPoint = {
-    import kafka.javaapi.Implicits._
-    underlying.coordinatorOpt
-  }
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
-    this.underlying.equals(otherConsumerMetadataResponse.underlying)
-  }
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
-
-  override def hashCode = underlying.hashCode
-
-  override def toString = underlying.toString
-
-}
-
-object GroupMetadataResponse {
-  def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9fce77e..9ea4079 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -65,7 +65,8 @@ object RequestChannel extends Logging {
         RequestKeys.deserializerForKey(requestId)(buffer)
       else
         null
-    // for client-side request / response format
+    // if we failed to find a server-side mapping, then try using the
+    // client-side request / response format
     val header: RequestHeader =
       if (requestObj == null) {
         buffer.rewind

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35c5956..0a2e0b9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
+import java.util
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -31,13 +32,12 @@ import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.Node
 
 import scala.collection._
-
-
 /**
  * Logic to handle the various Kafka requests
  */
@@ -74,11 +74,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
-        case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
+        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
         case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
+        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
+        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -676,34 +678,73 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
   }
 
-  /*
-   * Handle a consumer metadata request
-   */
-  def handleGroupMetadataRequest(request: RequestChannel.Request) {
-    val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
+  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
+    val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
+    val responseHeader = new ResponseHeader(request.header.correlationId)
 
-    if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
-      val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
+      val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
-      val partition = coordinator.partitionFor(groupMetadataRequest.group)
+      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
       val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
+      val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
+        partitionMetadata => partitionMetadata.leader
+      }
 
-      val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
-
-      val response =
-        offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
-          partitionMetadata.leader.map { leader =>
-            GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
-          }.getOrElse(errorResponse)
-        }.getOrElse(errorResponse)
+      val responseBody = coordinatorEndpoint match {
+        case None =>
+          new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
+        case Some(endpoint) =>
+          new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
+      }
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
-        .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+        .format(responseBody, request.header.correlationId, request.header.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    }
+  }
+
+  def handleDescribeGroupRequest(request: RequestChannel.Request) {
+    import JavaConverters._
+
+    val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+
+    val groups = describeRequest.groupIds().asScala.map {
+      case groupId =>
+        if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+          groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+        } else {
+          val (error, summary) = coordinator.handleDescribeGroup(groupId)
+          val members = summary.members.map { member =>
+            val metadata = ByteBuffer.wrap(member.metadata)
+            val assignment = ByteBuffer.wrap(member.assignment)
+            new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+          }
+          groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
+            summary.protocol, members.asJava)
+        }
+    }.toMap
+
+    val responseBody = new DescribeGroupsResponse(groups.asJava)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+  }
+
+  def handleListGroupsRequest(request: RequestChannel.Request) {
+    import JavaConverters._
+
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+    val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+      ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+    } else {
+      val (error, groups) = coordinator.handleListGroups()
+      val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+      new ListGroupsResponse(error.code, allGroups.asJava)
     }
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -740,6 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         joinGroupRequest.groupId(),
         joinGroupRequest.memberId(),
         request.header.clientId(),
+        request.session.host,
         joinGroupRequest.sessionTimeout(),
         joinGroupRequest.protocolType(),
         protocols,

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
new file mode 100644
index 0000000..97b49dd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.admin.AdminClient
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, Logging}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.junit.{Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+
+class AdminClientTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+  val groupId = "my-test"
+  val clientId = "consumer-498"
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
+
+  var client: AdminClient = null
+
+  // configure the servers and clients
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+  @Before
+  override def setUp() {
+    super.setUp
+    client = AdminClient.createSimplePlaintext(this.brokerList)
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @Test
+  def testListGroups() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val groups = client.listAllGroupsFlattened
+    assertFalse(groups.isEmpty)
+    val group = groups(0)
+    assertEquals(groupId, group.groupId)
+    assertEquals("consumer", group.protocolType)
+  }
+
+  @Test
+  def testDescribeGroup() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val group= client.describeGroup(groupId)
+    assertEquals("consumer", group.protocolType)
+    assertEquals("range", group.protocol)
+    assertEquals("Stable", group.state)
+    assertFalse(group.members.isEmpty)
+
+    val member = group.members(0)
+    assertEquals(clientId, member.clientId)
+    assertFalse(member.clientHost.isEmpty)
+    assertFalse(member.memberId.isEmpty)
+  }
+
+  @Test
+  def testDescribeConsumerGroup() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val assignment = client.describeConsumerGroup(groupId)
+    assertEquals(1, assignment.size)
+    for (partitions <- assignment.values)
+      assertEquals(Set(tp, tp2), partitions.toSet)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d484b8..e363e27 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,7 +85,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
       RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
       RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
-      RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+      RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
       RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
       RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
       RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
     RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
     RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+    RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
     RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
     RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
     RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
@@ -121,7 +121,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     RequestKeys.OffsetsKey -> TopicDescribeAcl,
     RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
     RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
-    RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+    RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl),
     RequestKeys.UpdateMetadataKey -> ClusterAcl,
     RequestKeys.JoinGroupKey -> GroupReadAcl,
     RequestKeys.SyncGroupKey -> GroupReadAcl,
@@ -174,7 +174,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
       RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
       RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
-      RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+      RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
       RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
         Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
         Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index a77979a..86e6877 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
       val id = random.nextInt().abs % numGroups
       val group = "group-" + id
       try {
-        metadataChannel.send(GroupMetadataRequest(group))
-        val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+        metadataChannel.send(GroupCoordinatorRequest(group))
+        val coordinatorId = GroupCoordinatorResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
 
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 09e9ce3..90f629a 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
     ))
   }
 
-  def createConsumerMetadataRequest: GroupMetadataRequest = {
-    GroupMetadataRequest("group 1", clientId = "client 1")
+  def createConsumerMetadataRequest: GroupCoordinatorRequest = {
+    GroupCoordinatorRequest("group 1", clientId = "client 1")
   }
 
-  def createConsumerMetadataResponse: GroupMetadataResponse = {
-    GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+  def createConsumerMetadataResponse: GroupCoordinatorResponse = {
+    GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
   }
 
   def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
   private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
   private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
   private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5e6bd03..c1278e4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -51,10 +51,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   type LeaveGroupCallbackParams = Short
   type LeaveGroupCallback = Short => Unit
 
+  val ClientId = "consumer-test"
+  val ClientHost = "localhost"
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 1000
   val DefaultSessionTimeout = 500
-  var consumerCoordinator: GroupCoordinator = null
+  var groupCoordinator: GroupCoordinator = null
   var replicaManager: ReplicaManager = null
   var scheduler: KafkaScheduler = null
   var zkUtils: ZkUtils = null
@@ -85,26 +87,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
-    consumerCoordinator.startup()
+    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+    groupCoordinator.startup()
 
     // add the partition into the owned partition list
-    groupPartitionId = consumerCoordinator.partitionFor(groupId)
-    consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+    groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
   }
 
   @After
   def tearDown() {
     EasyMock.reset(replicaManager)
-    consumerCoordinator.shutdown()
+    groupCoordinator.shutdown()
   }
 
   @Test
   def testJoinGroupWrongCoordinator() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols)
+    val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
   }
@@ -139,8 +140,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val groupId = ""
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
   }
 
@@ -164,8 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
-      protocols)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
   }
 
@@ -285,6 +284,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testSyncGroupEmptyAssignment() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    assertTrue(syncGroupResult._1.isEmpty)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
   def testSyncGroupNotCoordinator() {
     val generation = 1
 
@@ -668,6 +688,92 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, leaveGroupResult)
   }
 
+  @Test
+  def testListGroupsIncludesStableGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+  }
+
+  @Test
+  def testListGroupsIncludesRebalancingGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+  }
+
+  @Test
+  def testDescribeGroupWrongCoordinator() {
+    EasyMock.reset(replicaManager)
+    val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+  }
+
+  @Test
+  def testDescribeGroupInactiveGroup() {
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(GroupCoordinator.DeadGroup, summary)
+  }
+
+  @Test
+  def testDescribeGroupStable() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId,  Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals("range", summary.protocol)
+    assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
+  }
+
+  @Test
+  def testDescribeGroupRebalancing() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
+    assertEquals(AwaitingSync.toString, summary.state)
+    assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
+    assertTrue(summary.members.forall(_.metadata.isEmpty))
+    assertTrue(summary.members.forall(_.assignment.isEmpty))
+  }
+
   private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
     val responsePromise = Promise[JoinGroupResult]
     val responseFuture = responsePromise.future
@@ -706,7 +812,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback)
+    groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", sessionTimeout,
+      protocolType, protocols, responseCallback)
     responseFuture
   }
 
@@ -731,7 +838,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       )})
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
     responseFuture
   }
 
@@ -742,7 +849,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }
 
@@ -779,7 +886,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -807,7 +914,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       )})
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -817,7 +924,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 021aea6..2846622 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -146,18 +146,19 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocol() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
     assertEquals("range", group.selectProtocol)
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)
@@ -165,7 +166,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(Set("range", "roundrobin")(group.selectProtocol))
 
     val lastMemberId = "lastMemberId"
-    val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs,
+    val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(lastMemberId, lastMember)
@@ -182,15 +183,16 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocolChoosesCompatibleProtocol() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(memberId, member)
@@ -201,14 +203,15 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSupportsProtocols() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     // by default, the group supports everything
     assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
@@ -217,7 +220,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertFalse(group.supportsProtocols(Set("foo", "bar")))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)