You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/03 23:40:26 UTC

[2/3] kafka git commit: KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
new file mode 100644
index 0000000..ddd3114
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.admin
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import kafka.common.KafkaException
+import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.errors.DisconnectException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.Selector
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.{TopicPartition, Cluster, Node}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class AdminClient(val time: Time,
+                  val requestTimeoutMs: Int,
+                  val client: ConsumerNetworkClient,
+                  val bootstrapBrokers: List[Node]) extends Logging {
+
+  private def send(target: Node,
+                   api: ApiKeys,
+                   request: AbstractRequest): Struct = {
+    var now = time.milliseconds()
+    val deadline = now + requestTimeoutMs
+    var future: RequestFuture[ClientResponse] = null
+
+    do {
+      future = client.send(target, api, request)
+      client.poll(future)
+
+      if (future.succeeded())
+        return if (future.value().wasDisconnected()) {
+          throw new DisconnectException()
+        } else {
+          future.value().responseBody()
+        }
+
+      now = time.milliseconds()
+    } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
+
+    throw future.exception()
+  }
+
+  private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
+    bootstrapBrokers.foreach {
+      case broker =>
+        try {
+          return send(broker, api, request)
+        } catch {
+          case e: Exception =>
+            debug(s"Request ${api} failed against node ${broker}", e)
+        }
+    }
+    throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+  }
+
+  private def findCoordinator(groupId: String): Node = {
+    val request = new GroupCoordinatorRequest(groupId)
+    val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
+    val response = new GroupCoordinatorResponse(responseBody)
+    Errors.forCode(response.errorCode()).maybeThrow()
+    response.node()
+  }
+
+  def listGroups(node: Node): List[GroupOverview] = {
+    val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
+    val response = new ListGroupsResponse(responseBody)
+    Errors.forCode(response.errorCode()).maybeThrow()
+    response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+  }
+
+  private def findAllBrokers(): List[Node] = {
+    val request = new MetadataRequest(List[String]())
+    val responseBody = sendAnyNode(ApiKeys.METADATA, request)
+    val response = new MetadataResponse(responseBody)
+    if (!response.errors().isEmpty)
+      debug(s"Metadata request contained errors: ${response.errors()}")
+    response.cluster().nodes().asScala.toList
+  }
+
+  def listAllGroups(): Map[Node, List[GroupOverview]] = {
+    findAllBrokers.map {
+      case broker =>
+        broker -> {
+          try {
+            listGroups(broker)
+          } catch {
+            case e: Exception =>
+              debug(s"Failed to find groups from broker ${broker}", e)
+              List[GroupOverview]()
+          }
+        }
+    }.toMap
+  }
+
+  def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
+    listAllGroups().mapValues { groups =>
+      groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+    }
+  }
+
+  def listAllGroupsFlattened(): List[GroupOverview] = {
+    listAllGroups.values.flatten.toList
+  }
+
+  def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
+    listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+  }
+
+  def describeGroup(groupId: String): GroupSummary = {
+    val coordinator = findCoordinator(groupId)
+    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+    val response = new DescribeGroupsResponse(responseBody)
+    val metadata = response.groups().get(groupId)
+    if (metadata == null)
+      throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+
+    Errors.forCode(metadata.errorCode()).maybeThrow()
+    val members = metadata.members().map {
+      case member =>
+        val metadata = Utils.readBytes(member.memberMetadata())
+        val assignment = Utils.readBytes(member.memberAssignment())
+        MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
+    }.toList
+    GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
+  }
+
+  def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
+    val group = describeGroup(groupId)
+    if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
+      throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
+
+    group.members.map {
+      case member =>
+        val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+        member.memberId -> assignment.partitions().asScala.toList
+    }.toMap
+  }
+
+}
+
+object AdminClient {
+  val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
+  val DefaultRequestTimeoutMs = 5000
+  val DefaultMaxInFlightRequestsPerConnection = 100
+  val DefaultReconnectBackoffMs = 50
+  val DefaultSendBufferBytes = 128 * 1024
+  val DefaultReceiveBufferBytes = 32 * 1024
+  val DefaultRetryBackoffMs = 100
+  val AdminClientIdSequence = new AtomicInteger(1)
+  val AdminConfigDef = {
+    val config = new ConfigDef()
+      .define(
+        CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+        Type.LIST,
+        Importance.HIGH,
+        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+      .define(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+        ConfigDef.Type.STRING,
+        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+      .withClientSslSupport()
+      .withClientSaslSupport()
+    config
+  }
+
+  class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+
+  def createSimplePlaintext(brokerUrl: String): AdminClient = {
+    val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
+    create(new AdminConfig(config))
+  }
+
+  def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
+
+  def create(config: AdminConfig): AdminClient = {
+    val time = new SystemTime
+    val metrics = new Metrics(time)
+    val metadata = new Metadata
+    val channelBuilder = ClientUtils.createChannelBuilder(config.values())
+
+    val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+    val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
+    metadata.update(bootstrapCluster, 0)
+
+    val selector = new Selector(
+      DefaultConnectionMaxIdleMs,
+      metrics,
+      time,
+      "admin",
+      Map[String, String](),
+      channelBuilder)
+
+    val networkClient = new NetworkClient(
+      selector,
+      metadata,
+      "admin-" + AdminClientIdSequence.getAndIncrement(),
+      DefaultMaxInFlightRequestsPerConnection,
+      DefaultReconnectBackoffMs,
+      DefaultSendBufferBytes,
+      DefaultReceiveBufferBytes,
+      DefaultRequestTimeoutMs,
+      time)
+
+    val highLevelClient = new ConsumerNetworkClient(
+      networkClient,
+      metadata,
+      time,
+      DefaultRetryBackoffMs)
+
+    new AdminClient(
+      time,
+      DefaultRequestTimeoutMs,
+      highLevelClient,
+      bootstrapCluster.nodes().asScala.toList)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
new file mode 100644
index 0000000..43e78f5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupCoordinatorRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    // envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+
+    // request
+    val group = ApiUtils.readShortString(buffer)
+    GroupCoordinatorRequest(group, versionId, correlationId, clientId)
+  }
+
+}
+
+case class GroupCoordinatorRequest(group: String,
+                                   versionId: Short = GroupCoordinatorRequest.CurrentVersion,
+                                   correlationId: Int = 0,
+                                   clientId: String = GroupCoordinatorRequest.DefaultClientId)
+  extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    ApiUtils.shortStringLength(clientId) +
+    ApiUtils.shortStringLength(group)
+
+  def writeTo(buffer: ByteBuffer) {
+    // envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    ApiUtils.writeShortString(buffer, clientId)
+
+    // consumer metadata request
+    ApiUtils.writeShortString(buffer, group)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    // return ConsumerCoordinatorNotAvailable for all uncaught errors
+    val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val consumerMetadataRequest = new StringBuilder
+    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    consumerMetadataRequest.append("; Version: " + versionId)
+    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+    consumerMetadataRequest.append("; ClientId: " + clientId)
+    consumerMetadataRequest.append("; Group: " + group)
+    consumerMetadataRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..4cd7db8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupCoordinatorResponse {
+  val CurrentVersion = 0
+
+  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+  
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val broker = BrokerEndPoint.readFrom(buffer)
+    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+      Some(broker)
+    else
+      None
+
+    GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
+  }
+  
+}
+
+case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+  extends RequestOrResponse() {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+    2 + /* error code */
+    coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+  }
+
+  def describe(details: Boolean) = toString
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
deleted file mode 100644
index 075ddb5..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object GroupMetadataRequest {
-  val CurrentVersion = 0.shortValue
-  val DefaultClientId = ""
-
-  def readFrom(buffer: ByteBuffer) = {
-    // envelope
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = ApiUtils.readShortString(buffer)
-
-    // request
-    val group = ApiUtils.readShortString(buffer)
-    GroupMetadataRequest(group, versionId, correlationId, clientId)
-  }
-
-}
-
-case class GroupMetadataRequest(group: String,
-                                versionId: Short = GroupMetadataRequest.CurrentVersion,
-                                correlationId: Int = 0,
-                                clientId: String = GroupMetadataRequest.DefaultClientId)
-  extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
-
-  def sizeInBytes =
-    2 + /* versionId */
-    4 + /* correlationId */
-    ApiUtils.shortStringLength(clientId) +
-    ApiUtils.shortStringLength(group)
-
-  def writeTo(buffer: ByteBuffer) {
-    // envelope
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    ApiUtils.writeShortString(buffer, clientId)
-
-    // consumer metadata request
-    ApiUtils.writeShortString(buffer, group)
-  }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  def describe(details: Boolean) = {
-    val consumerMetadataRequest = new StringBuilder
-    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
-    consumerMetadataRequest.append("; Version: " + versionId)
-    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
-    consumerMetadataRequest.append("; ClientId: " + clientId)
-    consumerMetadataRequest.append("; Group: " + group)
-    consumerMetadataRequest.toString()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
deleted file mode 100644
index 2d65917..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object GroupMetadataResponse {
-  val CurrentVersion = 0
-
-  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-  
-  def readFrom(buffer: ByteBuffer) = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val broker = BrokerEndPoint.readFrom(buffer)
-    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
-      Some(broker)
-    else
-      None
-
-    GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
-  }
-  
-}
-
-case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
-  extends RequestOrResponse() {
-
-  def sizeInBytes =
-    4 + /* correlationId */
-    2 + /* error code */
-    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
-  }
-
-  def describe(details: Boolean) = toString
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 669b63a..2363099 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,12 +33,17 @@ object RequestKeys {
   val ControlledShutdownKey: Short = 7
   val OffsetCommitKey: Short = 8
   val OffsetFetchKey: Short = 9
-  val GroupMetadataKey: Short = 10
+  val GroupCoordinatorKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
   val LeaveGroupKey: Short = 13
   val SyncGroupKey: Short = 14
+  val DescribeGroupsKey: Short = 15
+  val ListGroupsKey: Short = 16
 
+  // NOTE: this map only includes the server-side request/response handlers. Newer
+  // request types should only use the client-side versions which are parsed with
+  // o.a.k.common.requests.AbstractRequest.getRequest()
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
         FetchKey -> ("Fetch", FetchRequest.readFrom),
@@ -49,8 +54,7 @@ object RequestKeys {
         UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
         ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
+        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 36b5b3b..2f836c0 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
            if (!queryChannel.isConnected)
              queryChannel = channelToAnyBroker(zkUtils)
            debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
-           queryChannel.send(GroupMetadataRequest(group))
+           queryChannel.send(GroupCoordinatorRequest(group))
            val response = queryChannel.receive()
-           val consumerMetadataResponse =  GroupMetadataResponse.readFrom(response.payload())
+           val consumerMetadataResponse =  GroupCoordinatorResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 81cb51b..6f53fac 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -60,6 +60,7 @@ object ErrorMapping {
   // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
   // 28: INVALID_COMMIT_OFFSET_SIZE
   val AuthorizationCode: Short = 29
+  // 30: REBALANCE_IN_PROGRESS
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5b1aead..e15aca4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
     TopicMetadataResponse.readFrom(response.payload())
   }
 
-  def send(request: GroupMetadataRequest): GroupMetadataResponse = {
+  def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = {
     val response = sendRequest(request)
-    GroupMetadataResponse.readFrom(response.payload())
+    GroupCoordinatorResponse.readFrom(response.payload())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 97ce22b..2015371 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -107,6 +107,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleJoinGroup(groupId: String,
                       memberId: String,
                       clientId: String,
+                      clientHost: String,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
@@ -132,10 +133,10 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
         } else {
           group = groupManager.addGroup(groupId, protocolType)
-          doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
         }
       } else {
-        doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+        doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
@@ -143,6 +144,7 @@ class GroupCoordinator(val brokerId: Int,
   private def doJoinGroup(group: GroupMetadata,
                           memberId: String,
                           clientId: String,
+                          clientHost: String,
                           sessionTimeoutMs: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
@@ -166,7 +168,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -174,7 +176,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -201,7 +203,7 @@ class GroupCoordinator(val brokerId: Int,
           case Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -269,13 +271,30 @@ class GroupCoordinator(val brokerId: Int,
             group.get(memberId).awaitingSyncCallback = responseCallback
             completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
 
-            // if this is the leader, then we can transition to stable and
-            // propagate the assignment to any awaiting members
+            // if this is the leader, then we can attempt to persist state and transition to stable
             if (memberId == group.leaderId) {
-              group.transitionTo(Stable)
 
-              // persist the group metadata and upon finish propagate the assignment
-              groupManager.storeGroup(group, groupAssignment)
+              // fill any missing members with an empty assignment
+              val missing = group.allMembers -- groupAssignment.keySet
+              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+              // persist the group metadata and upon finish transition to stable and propagate the assignment
+              groupManager.storeGroup(group, assignment, (errorCode: Short) => {
+                group synchronized {
+                  // another member may have joined the group while we were awaiting this callback,
+                  // so we must ensure we are still in the AwaitingSync state when it gets invoked.
+                  // if we have transitioned to another state, then we shouldn't do anything
+                  if (group.is(AwaitingSync)) {
+                    if (errorCode != Errors.NONE.code) {
+                      resetAndPropagateAssignmentError(group, errorCode)
+                      maybePrepareRebalance(group)
+                    } else if (group.is(AwaitingSync)) {
+                      setAndPropagateAssignment(group, assignment)
+                      group.transitionTo(Stable)
+                    }
+                  }
+                }
+              })
             }
 
           case Stable =>
@@ -413,6 +432,34 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  def handleListGroups(): (Errors, List[GroupOverview]) = {
+    if (!isActive.get) {
+      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+    } else {
+      val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+      (errorCode, groupManager.currentGroups.map(_.overview).toList)
+    }
+  }
+
+  def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+    if (!isActive.get) {
+      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+    } else {
+      val group = groupManager.getGroup(groupId)
+      if (group == null) {
+        (Errors.NONE, GroupCoordinator.DeadGroup)
+      } else {
+        group synchronized {
+          (Errors.NONE, group.summary)
+        }
+      }
+    }
+  }
+
   def handleGroupImmigration(offsetTopicPartitionId: Int) = {
     groupManager.loadGroupsForPartition(offsetTopicPartitionId)
   }
@@ -421,6 +468,27 @@ class GroupCoordinator(val brokerId: Int,
     groupManager.removeGroupsForPartition(offsetTopicPartitionId)
   }
 
+  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+    propagateAssignment(group, Errors.NONE.code)
+  }
+
+  private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+    propagateAssignment(group, errorCode)
+  }
+
+  private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+    for (member <- group.allMemberMetadata) {
+      if (member.awaitingSyncCallback != null) {
+        member.awaitingSyncCallback(member.assignment, errorCode)
+        member.awaitingSyncCallback = null
+      }
+    }
+  }
+
   private def validGroupId(groupId: String): Boolean = {
     groupId != null && !groupId.isEmpty
   }
@@ -458,12 +526,13 @@ class GroupCoordinator(val brokerId: Int,
 
   private def addMemberAndRebalance(sessionTimeoutMs: Int,
                                     clientId: String,
+                                    clientHost: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) = {
     // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
     maybePrepareRebalance(group)
@@ -488,11 +557,9 @@ class GroupCoordinator(val brokerId: Int,
 
   private def prepareRebalance(group: GroupMetadata) {
     // if any members are awaiting sync, cancel their request and have them rejoin
-    if (group.is(AwaitingSync)) {
-      groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code)
-    }
+    if (group.is(AwaitingSync))
+      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
 
-    group.allMembers.foreach(_.assignment = null)
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
 
@@ -544,7 +611,7 @@ class GroupCoordinator(val brokerId: Int,
         info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
 
         // trigger the awaiting join group response callback for all the members after rebalancing
-        for (member <- group.allMembers) {
+        for (member <- group.allMemberMetadata) {
           assert(member.awaitingJoinCallback != null)
           val joinResult = JoinGroupResult(
             members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
@@ -595,6 +662,11 @@ class GroupCoordinator(val brokerId: Int,
 
 object GroupCoordinator {
 
+  val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+  val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+  val NoMembers = List[MemberSummary]()
+  val NoState = ""
+  val NoProtocolType = ""
   val NoProtocol = ""
   val NoLeader = ""
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 652a3a4..ece9ce0 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -93,6 +93,20 @@ private object GroupMetadata {
 }
 
 /**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+                         protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+                        protocolType: String,
+                        protocol: String,
+                        members: List[MemberSummary])
+
+/**
  * Group contains the following metadata:
  *
  *  Membership metadata:
@@ -144,7 +158,9 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
 
-  def allMembers = members.values.toList
+  def allMembers = members.keySet
+
+  def allMemberMetadata = members.values.toList
 
   def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
     timeout.max(member.sessionTimeoutMs)
@@ -168,7 +184,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
     val candidates = candidateProtocols
 
     // let each member vote for one of the protocols and choose the one with the most votes
-    val votes: List[(String, Int)] = allMembers
+    val votes: List[(String, Int)] = allMemberMetadata
       .map(_.vote(candidates))
       .groupBy(identity)
       .mapValues(_.size)
@@ -179,7 +195,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   private def candidateProtocols = {
     // get the set of protocols that are commonly supported by all members
-    allMembers
+    allMemberMetadata
       .map(_.protocols)
       .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
   }
@@ -201,6 +217,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
     members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
   }
 
+  def summary: GroupSummary = {
+    if (is(Stable)) {
+      val members = this.members.values.map{ member => member.summary(protocol) }.toList
+      GroupSummary(state.toString, protocolType, protocol, members)
+    } else {
+      val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+      GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+    }
+  }
+
+  def overview: GroupOverview = {
+    GroupOverview(groupId, protocolType)
+  }
+
   private def assertValidTransition(targetState: GroupState) {
     if (!GroupMetadata.validPreviousStates(targetState).contains(state))
       throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 81ed548..0052b5d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -95,12 +95,16 @@ class GroupMetadataManager(val brokerId: Int,
     }
   )
 
+  def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+
   def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
 
   def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
 
   def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
 
+  def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty
+
   /**
    * Get the group associated with the given groupId, or null if not found
    */
@@ -158,7 +162,8 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   def storeGroup(group: GroupMetadata,
-                 groupAssignment: Map[String, Array[Byte]]) {
+                 groupAssignment: Map[String, Array[Byte]],
+                 responseCallback: Short => Unit) {
     // construct the message to append
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
@@ -208,12 +213,7 @@ class GroupMetadataManager(val brokerId: Int,
         }
       }
 
-      for (member <- group.allMembers) {
-        member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte])
-      }
-
-      // propagate the assignments
-      propagateAssignment(group, responseCode)
+      responseCallback(responseCode)
     }
 
     // call replica manager to append the group message
@@ -225,16 +225,7 @@ class GroupMetadataManager(val brokerId: Int,
       putCacheCallback)
   }
 
-  def propagateAssignment(group: GroupMetadata,
-                          errorCode: Short) {
-    val hasError = errorCode != Errors.NONE.code
-    for (member <- group.allMembers) {
-      if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode)
-        member.awaitingSyncCallback = null
-      }
-    }
-  }
+
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
@@ -657,10 +648,14 @@ object GroupMetadataManager {
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
   private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
+    new Field("client_id", STRING),
+    new Field("client_host", STRING),
     new Field("session_timeout", INT32),
     new Field("subscription", BYTES),
     new Field("assignment", BYTES))
   private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
+  private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
+  private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
   private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
   private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
   private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
@@ -787,10 +782,12 @@ object GroupMetadataManager {
     value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
     value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
 
-    val memberArray = groupMetadata.allMembers.map {
+    val memberArray = groupMetadata.allMemberMetadata.map {
       case memberMetadata =>
         val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
         memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
+        memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
+        memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost)
         memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
 
         val metadata = memberMetadata.metadata(groupMetadata.protocol)
@@ -901,10 +898,13 @@ object GroupMetadataManager {
           case memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
             val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
+            val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
+            val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
             val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
             val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
 
-            val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription)))
+            val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+              List((group.protocol, subscription)))
 
             member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 6a76241..80782c8 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -23,6 +23,13 @@ import kafka.utils.nonthreadsafe
 
 import scala.collection.Map
 
+
+case class MemberSummary(memberId: String,
+                         clientId: String,
+                         clientHost: String,
+                         metadata: Array[Byte],
+                         assignment: Array[Byte])
+
 /**
  * Member metadata contains the following metadata:
  *
@@ -46,15 +53,14 @@ import scala.collection.Map
 @nonthreadsafe
 private[coordinator] class MemberMetadata(val memberId: String,
                                           val groupId: String,
+                                          val clientId: String,
+                                          val clientHost: String,
                                           val sessionTimeoutMs: Int,
                                           var supportedProtocols: List[(String, Array[Byte])]) {
 
-  // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback
-  // since they can be accessed in the append callback thread that does not
-  // hold on the group object lock
-  @volatile var assignment: Array[Byte] = null
+  var assignment: Array[Byte] = Array.empty[Byte]
   var awaitingJoinCallback: JoinGroupResult => Unit = null
-  @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
 
@@ -87,6 +93,14 @@ private[coordinator] class MemberMetadata(val memberId: String,
     return true
   }
 
+  def summary(protocol: String): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+  }
+
+  def summaryNoMetadata(): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+  }
+
   /**
    * Vote for one of the potential group protocols. This takes into account the protocol preference as
    * indicated by the order of supported protocols and returns the first one also contained in the set

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..0e14758
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) {
+
+  def errorCode = underlying.errorCode
+
+  def coordinator: BrokerEndPoint = {
+    import kafka.javaapi.Implicits._
+    underlying.coordinatorOpt
+  }
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+    this.underlying.equals(otherConsumerMetadataResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+
+  override def hashCode = underlying.hashCode
+
+  override def toString = underlying.toString
+
+}
+
+object GroupCoordinatorResponse {
+  def readFrom(buffer: ByteBuffer) = new GroupCoordinatorResponse(kafka.api.GroupCoordinatorResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
deleted file mode 100644
index b94aa01..0000000
--- a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
-
-  def errorCode = underlying.errorCode
-
-  def coordinator: BrokerEndPoint = {
-    import kafka.javaapi.Implicits._
-    underlying.coordinatorOpt
-  }
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
-    this.underlying.equals(otherConsumerMetadataResponse.underlying)
-  }
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
-
-  override def hashCode = underlying.hashCode
-
-  override def toString = underlying.toString
-
-}
-
-object GroupMetadataResponse {
-  def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9fce77e..9ea4079 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -65,7 +65,8 @@ object RequestChannel extends Logging {
         RequestKeys.deserializerForKey(requestId)(buffer)
       else
         null
-    // for client-side request / response format
+    // if we failed to find a server-side mapping, then try using the
+    // client-side request / response format
     val header: RequestHeader =
       if (requestObj == null) {
         buffer.rewind

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35c5956..0a2e0b9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
+import java.util
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -31,13 +32,12 @@ import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.Node
 
 import scala.collection._
-
-
 /**
  * Logic to handle the various Kafka requests
  */
@@ -74,11 +74,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
-        case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
+        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
         case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
+        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
+        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -676,34 +678,73 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
   }
 
-  /*
-   * Handle a consumer metadata request
-   */
-  def handleGroupMetadataRequest(request: RequestChannel.Request) {
-    val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
+  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
+    val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
+    val responseHeader = new ResponseHeader(request.header.correlationId)
 
-    if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
-      val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
+      val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
-      val partition = coordinator.partitionFor(groupMetadataRequest.group)
+      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
       val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
+      val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
+        partitionMetadata => partitionMetadata.leader
+      }
 
-      val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
-
-      val response =
-        offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
-          partitionMetadata.leader.map { leader =>
-            GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
-          }.getOrElse(errorResponse)
-        }.getOrElse(errorResponse)
+      val responseBody = coordinatorEndpoint match {
+        case None =>
+          new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
+        case Some(endpoint) =>
+          new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
+      }
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
-        .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+        .format(responseBody, request.header.correlationId, request.header.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    }
+  }
+
+  def handleDescribeGroupRequest(request: RequestChannel.Request) {
+    import JavaConverters._
+
+    val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+
+    val groups = describeRequest.groupIds().asScala.map {
+      case groupId =>
+        if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+          groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+        } else {
+          val (error, summary) = coordinator.handleDescribeGroup(groupId)
+          val members = summary.members.map { member =>
+            val metadata = ByteBuffer.wrap(member.metadata)
+            val assignment = ByteBuffer.wrap(member.assignment)
+            new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+          }
+          groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
+            summary.protocol, members.asJava)
+        }
+    }.toMap
+
+    val responseBody = new DescribeGroupsResponse(groups.asJava)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+  }
+
+  def handleListGroupsRequest(request: RequestChannel.Request) {
+    import JavaConverters._
+
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+    val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+      ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+    } else {
+      val (error, groups) = coordinator.handleListGroups()
+      val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+      new ListGroupsResponse(error.code, allGroups.asJava)
     }
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -740,6 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         joinGroupRequest.groupId(),
         joinGroupRequest.memberId(),
         request.header.clientId(),
+        request.session.host,
         joinGroupRequest.sessionTimeout(),
         joinGroupRequest.protocolType(),
         protocols,

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
new file mode 100644
index 0000000..97b49dd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.admin.AdminClient
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, Logging}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.junit.{Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+
+class AdminClientTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+  val groupId = "my-test"
+  val clientId = "consumer-498"
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
+
+  var client: AdminClient = null
+
+  // configure the servers and clients
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+  @Before
+  override def setUp() {
+    super.setUp
+    client = AdminClient.createSimplePlaintext(this.brokerList)
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @Test
+  def testListGroups() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val groups = client.listAllGroupsFlattened
+    assertFalse(groups.isEmpty)
+    val group = groups(0)
+    assertEquals(groupId, group.groupId)
+    assertEquals("consumer", group.protocolType)
+  }
+
+  @Test
+  def testDescribeGroup() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val group= client.describeGroup(groupId)
+    assertEquals("consumer", group.protocolType)
+    assertEquals("range", group.protocol)
+    assertEquals("Stable", group.state)
+    assertFalse(group.members.isEmpty)
+
+    val member = group.members(0)
+    assertEquals(clientId, member.clientId)
+    assertFalse(member.clientHost.isEmpty)
+    assertFalse(member.memberId.isEmpty)
+  }
+
+  @Test
+  def testDescribeConsumerGroup() {
+    consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers(0).poll(0)
+      !consumers(0).assignment().isEmpty
+    }, "Expected non-empty assignment")
+
+    val assignment = client.describeConsumerGroup(groupId)
+    assertEquals(1, assignment.size)
+    for (partitions <- assignment.values)
+      assertEquals(Set(tp, tp2), partitions.toSet)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d484b8..e363e27 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,7 +85,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
       RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
       RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
-      RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+      RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
       RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
       RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
       RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
     RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
     RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+    RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
     RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
     RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
     RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
@@ -121,7 +121,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     RequestKeys.OffsetsKey -> TopicDescribeAcl,
     RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
     RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
-    RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+    RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl),
     RequestKeys.UpdateMetadataKey -> ClusterAcl,
     RequestKeys.JoinGroupKey -> GroupReadAcl,
     RequestKeys.SyncGroupKey -> GroupReadAcl,
@@ -174,7 +174,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
       RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
       RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
-      RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+      RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
       RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
         Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
         Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index a77979a..86e6877 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
       val id = random.nextInt().abs % numGroups
       val group = "group-" + id
       try {
-        metadataChannel.send(GroupMetadataRequest(group))
-        val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+        metadataChannel.send(GroupCoordinatorRequest(group))
+        val coordinatorId = GroupCoordinatorResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
 
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 09e9ce3..90f629a 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
     ))
   }
 
-  def createConsumerMetadataRequest: GroupMetadataRequest = {
-    GroupMetadataRequest("group 1", clientId = "client 1")
+  def createConsumerMetadataRequest: GroupCoordinatorRequest = {
+    GroupCoordinatorRequest("group 1", clientId = "client 1")
   }
 
-  def createConsumerMetadataResponse: GroupMetadataResponse = {
-    GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+  def createConsumerMetadataResponse: GroupCoordinatorResponse = {
+    GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
   }
 
   def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
   private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
   private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
   private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5e6bd03..c1278e4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -51,10 +51,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   type LeaveGroupCallbackParams = Short
   type LeaveGroupCallback = Short => Unit
 
+  val ClientId = "consumer-test"
+  val ClientHost = "localhost"
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 1000
   val DefaultSessionTimeout = 500
-  var consumerCoordinator: GroupCoordinator = null
+  var groupCoordinator: GroupCoordinator = null
   var replicaManager: ReplicaManager = null
   var scheduler: KafkaScheduler = null
   var zkUtils: ZkUtils = null
@@ -85,26 +87,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
-    consumerCoordinator.startup()
+    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+    groupCoordinator.startup()
 
     // add the partition into the owned partition list
-    groupPartitionId = consumerCoordinator.partitionFor(groupId)
-    consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+    groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
   }
 
   @After
   def tearDown() {
     EasyMock.reset(replicaManager)
-    consumerCoordinator.shutdown()
+    groupCoordinator.shutdown()
   }
 
   @Test
   def testJoinGroupWrongCoordinator() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols)
+    val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
   }
@@ -139,8 +140,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val groupId = ""
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
   }
 
@@ -164,8 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
-      protocols)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
   }
 
@@ -285,6 +284,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testSyncGroupEmptyAssignment() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    assertTrue(syncGroupResult._1.isEmpty)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
   def testSyncGroupNotCoordinator() {
     val generation = 1
 
@@ -668,6 +688,92 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, leaveGroupResult)
   }
 
+  @Test
+  def testListGroupsIncludesStableGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+  }
+
+  @Test
+  def testListGroupsIncludesRebalancingGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+  }
+
+  @Test
+  def testDescribeGroupWrongCoordinator() {
+    EasyMock.reset(replicaManager)
+    val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+  }
+
+  @Test
+  def testDescribeGroupInactiveGroup() {
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(GroupCoordinator.DeadGroup, summary)
+  }
+
+  @Test
+  def testDescribeGroupStable() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId,  Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals("range", summary.protocol)
+    assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
+  }
+
+  @Test
+  def testDescribeGroupRebalancing() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
+    assertEquals(AwaitingSync.toString, summary.state)
+    assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
+    assertTrue(summary.members.forall(_.metadata.isEmpty))
+    assertTrue(summary.members.forall(_.assignment.isEmpty))
+  }
+
   private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
     val responsePromise = Promise[JoinGroupResult]
     val responseFuture = responsePromise.future
@@ -706,7 +812,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback)
+    groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", sessionTimeout,
+      protocolType, protocols, responseCallback)
     responseFuture
   }
 
@@ -731,7 +838,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       )})
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
     responseFuture
   }
 
@@ -742,7 +849,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }
 
@@ -779,7 +886,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -807,7 +914,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       )})
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -817,7 +924,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
     EasyMock.replay(replicaManager)
 
-    consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 021aea6..2846622 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -146,18 +146,19 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocol() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
     assertEquals("range", group.selectProtocol)
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)
@@ -165,7 +166,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(Set("range", "roundrobin")(group.selectProtocol))
 
     val lastMemberId = "lastMemberId"
-    val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs,
+    val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(lastMemberId, lastMember)
@@ -182,15 +183,16 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocolChoosesCompatibleProtocol() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(memberId, member)
@@ -201,14 +203,15 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSupportsProtocols() {
     val groupId = "groupId"
-
+    val clientId = "clientId"
+    val clientHost = "clientHost"
     val sessionTimeoutMs = 10000
 
     // by default, the group supports everything
     assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
@@ -217,7 +220,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertFalse(group.supportsProtocols(Set("foo", "bar")))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
       List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)