You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:48 UTC

[4/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
deleted file mode 100644
index ea1c0d0..0000000
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object ConsumerMetadataResponse {
-  val CurrentVersion = 0
-
-  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-  
-  def readFrom(buffer: ByteBuffer) = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val broker = BrokerEndPoint.readFrom(buffer)
-    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
-      Some(broker)
-    else
-      None
-
-    ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
-  }
-  
-}
-
-case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
-  extends RequestOrResponse() {
-
-  def sizeInBytes =
-    4 + /* correlationId */
-    2 + /* error code */
-    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
-  }
-
-  def describe(details: Boolean) = toString
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
new file mode 100644
index 0000000..075ddb5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupMetadataRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    // envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+
+    // request
+    val group = ApiUtils.readShortString(buffer)
+    GroupMetadataRequest(group, versionId, correlationId, clientId)
+  }
+
+}
+
+case class GroupMetadataRequest(group: String,
+                                versionId: Short = GroupMetadataRequest.CurrentVersion,
+                                correlationId: Int = 0,
+                                clientId: String = GroupMetadataRequest.DefaultClientId)
+  extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    ApiUtils.shortStringLength(clientId) +
+    ApiUtils.shortStringLength(group)
+
+  def writeTo(buffer: ByteBuffer) {
+    // envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    ApiUtils.writeShortString(buffer, clientId)
+
+    // consumer metadata request
+    ApiUtils.writeShortString(buffer, group)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    // return ConsumerCoordinatorNotAvailable for all uncaught errors
+    val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val consumerMetadataRequest = new StringBuilder
+    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    consumerMetadataRequest.append("; Version: " + versionId)
+    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+    consumerMetadataRequest.append("; ClientId: " + clientId)
+    consumerMetadataRequest.append("; Group: " + group)
+    consumerMetadataRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
new file mode 100644
index 0000000..2d65917
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupMetadataResponse {
+  val CurrentVersion = 0
+
+  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+  
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val broker = BrokerEndPoint.readFrom(buffer)
+    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+      Some(broker)
+    else
+      None
+
+    GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
+  }
+  
+}
+
+case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+  extends RequestOrResponse() {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+    2 + /* error code */
+    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+  }
+
+  def describe(details: Boolean) = toString
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 5b362ef..75067cf 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -41,7 +41,7 @@ object OffsetCommitRequest extends Logging {
     val clientId = readShortString(buffer)
 
     // Read the OffsetRequest 
-    val consumerGroupId = readShortString(buffer)
+    val groupId = readShortString(buffer)
 
     // version 1 and 2 specific fields
     val groupGenerationId: Int =
@@ -50,11 +50,11 @@ object OffsetCommitRequest extends Logging {
       else
         org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
 
-    val consumerId: String =
+    val memberId: String =
       if (versionId >= 1)
         readShortString(buffer)
       else
-        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
+        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID
 
     // version 2 specific fields
     val retentionMs: Long =
@@ -83,7 +83,7 @@ object OffsetCommitRequest extends Logging {
       })
     })
 
-    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs)
+    OffsetCommitRequest(groupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, memberId, retentionMs)
   }
 }
 
@@ -93,7 +93,7 @@ case class OffsetCommitRequest(groupId: String,
                                correlationId: Int = 0,
                                clientId: String = OffsetCommitRequest.DefaultClientId,
                                groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID,
+                               memberId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID,
                                retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
 
@@ -114,7 +114,7 @@ case class OffsetCommitRequest(groupId: String,
     // version 1 and 2 specific data
     if (versionId >= 1) {
       buffer.putInt(groupGenerationId)
-      writeShortString(buffer, consumerId)
+      writeShortString(buffer, memberId)
     }
 
     // version 2 or above specific data
@@ -142,7 +142,7 @@ case class OffsetCommitRequest(groupId: String,
     4 + /* correlationId */
     shortStringLength(clientId) +
     shortStringLength(groupId) +
-    (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) +
+    (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(memberId) else 0) +
     (if (versionId >= 2) 8 /* retention time */ else 0) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
@@ -175,7 +175,7 @@ case class OffsetCommitRequest(groupId: String,
     offsetCommitRequest.append("; ClientId: " + clientId)
     offsetCommitRequest.append("; GroupId: " + groupId)
     offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId)
-    offsetCommitRequest.append("; ConsumerId: " + consumerId)
+    offsetCommitRequest.append("; MemberId: " + memberId)
     offsetCommitRequest.append("; RetentionMs: " + retentionMs)
     if(details)
       offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 8a22c1a..669b63a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,10 +33,11 @@ object RequestKeys {
   val ControlledShutdownKey: Short = 7
   val OffsetCommitKey: Short = 8
   val OffsetFetchKey: Short = 9
-  val ConsumerMetadataKey: Short = 10
+  val GroupMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
   val LeaveGroupKey: Short = 13
+  val SyncGroupKey: Short = 14
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -49,7 +50,7 @@ object RequestKeys {
         ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)
+        GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 6ae0347..36b5b3b 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
            if (!queryChannel.isConnected)
              queryChannel = channelToAnyBroker(zkUtils)
            debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
-           queryChannel.send(ConsumerMetadataRequest(group))
+           queryChannel.send(GroupMetadataRequest(group))
            val response = queryChannel.receive()
-           val consumerMetadataResponse =  ConsumerMetadataResponse.readFrom(response.payload())
+           val consumerMetadataResponse =  GroupMetadataResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index deb48b1..bbee894 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -64,8 +64,9 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
 object OffsetMetadataAndError {
   val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
   val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code)
-  val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code)
-  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
+  val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code)
+  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
   val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
   val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index db75d4b..ca41eba 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,7 +18,7 @@
 package kafka.common
 
 import util.matching.Regex
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 
 object Topic {
@@ -26,7 +26,7 @@ object Topic {
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
-  val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName)
+  val InternalTopics = Set(GroupCoordinator.OffsetsTopicName)
 
   def validate(topic: String) {
     if (topic.length <= 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index b7af6d6..5b1aead 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
     TopicMetadataResponse.readFrom(response.payload())
   }
 
-  def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
+  def send(request: GroupMetadataRequest): GroupMetadataResponse = {
     val response = sendRequest(request)
-    ConsumerMetadataResponse.readFrom(response.payload())
+    GroupMetadataResponse.readFrom(response.payload())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
deleted file mode 100644
index bf23e9b..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator
-
-import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
-import kafka.message.UncompressedCodec
-import kafka.log.LogConfig
-import kafka.server._
-import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
-
-import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.Properties
-import scala.collection.{Map, Seq, immutable}
-
-case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int,
-                              consumerMaxSessionTimeoutMs: Int)
-
-/**
- * ConsumerCoordinator handles consumer group and consumer offset management.
- *
- * Each Kafka server instantiates a coordinator which is responsible for a set of
- * consumer groups. Consumer groups are assigned to coordinators based on their
- * group names.
- */
-class ConsumerCoordinator(val brokerId: Int,
-                          val groupConfig: GroupManagerConfig,
-                          val offsetConfig: OffsetManagerConfig,
-                          private val offsetManager: OffsetManager,
-                          zkUtils: ZkUtils) extends Logging {
-
-  this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
-
-  private val isActive = new AtomicBoolean(false)
-
-  private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
-  private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
-  private var coordinatorMetadata: CoordinatorMetadata = null
-
-  def this(brokerId: Int,
-           groupConfig: GroupManagerConfig,
-           offsetConfig: OffsetManagerConfig,
-           replicaManager: ReplicaManager,
-           zkUtils: ZkUtils,
-           scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
-    new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils)
-
-  def offsetsTopicConfigs: Properties = {
-    val props = new Properties
-    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
-    props
-  }
-
-  /**
-   * NOTE: If a group lock and metadataLock are simultaneously needed,
-   * be sure to acquire the group lock before metadataLock to prevent deadlock
-   */
-
-  /**
-   * Startup logic executed at the same time when the server starts up.
-   */
-  def startup() {
-    info("Starting up.")
-    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
-    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
-    coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance)
-    isActive.set(true)
-    info("Startup complete.")
-  }
-
-  /**
-   * Shutdown logic executed at the same time when server shuts down.
-   * Ordering of actions should be reversed from the startup process.
-   */
-  def shutdown() {
-    info("Shutting down.")
-    isActive.set(false)
-    offsetManager.shutdown()
-    coordinatorMetadata.shutdown()
-    heartbeatPurgatory.shutdown()
-    rebalancePurgatory.shutdown()
-    info("Shutdown complete.")
-  }
-
-  def handleJoinGroup(groupId: String,
-                      consumerId: String,
-                      topics: Set[String],
-                      sessionTimeoutMs: Int,
-                      partitionAssignmentStrategy: String,
-                      responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
-    if (!isActive.get) {
-      responseCallback(Set.empty, consumerId, 0, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
-    } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
-      responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
-    } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs ||
-               sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) {
-      responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
-    } else {
-      // only try to create the group if the group is not unknown AND
-      // the consumer id is UNKNOWN, if consumer is specified but group does not
-      // exist we should reject the request
-      var group = coordinatorMetadata.getGroup(groupId)
-      if (group == null) {
-        if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
-          responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
-        } else {
-          group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
-          doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback)
-        }
-      } else {
-        doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback)
-      }
-    }
-  }
-
-  private def doJoinGroup(group: ConsumerGroupMetadata,
-                          consumerId: String,
-                          topics: Set[String],
-                          sessionTimeoutMs: Int,
-                          partitionAssignmentStrategy: String,
-                          responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
-    group synchronized {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; this is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-        // joining without specified consumer id,
-        responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
-      } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) {
-        responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code)
-      } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) {
-        // if the consumer trying to register with a un-recognized id, send the response to let
-        // it reset its consumer id and retry
-        responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
-      } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) {
-        /*
-         * if an existing consumer sends a JoinGroupRequest with no changes while the group is stable,
-         * just treat it like a heartbeat and return their currently assigned partitions.
-         */
-        val consumer = group.get(consumerId)
-        completeAndScheduleNextHeartbeatExpiration(group, consumer)
-        responseCallback(consumer.assignedTopicPartitions, consumerId, group.generationId, Errors.NONE.code)
-      } else {
-        val consumer = if (consumerId == JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
-          // if the consumer id is unknown, register this consumer to the group
-          val generatedConsumerId = group.generateNextConsumerId
-          val consumer = addConsumer(generatedConsumerId, topics, sessionTimeoutMs, group)
-          maybePrepareRebalance(group)
-          consumer
-        } else {
-          val consumer = group.get(consumerId)
-          if (topics != consumer.topics) {
-            // existing consumer changed its subscribed topics
-            updateConsumer(group, consumer, topics)
-            maybePrepareRebalance(group)
-            consumer
-          } else {
-            // existing consumer rejoining a group due to rebalance
-            consumer
-          }
-        }
-
-        consumer.awaitingRebalanceCallback = responseCallback
-
-        if (group.is(PreparingRebalance))
-          rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
-      }
-    }
-  }
-
-  def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
-    } else {
-      val group = coordinatorMetadata.getGroup(groupId)
-      if (group == null) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; this is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-        // joining without specified consumer id,
-        responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else if (!group.has(consumerId)) {
-            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else {
-            val consumer = group.get(consumerId)
-            removeHeartbeatForLeavingConsumer(group, consumer)
-            onConsumerFailure(group, consumer)
-            responseCallback(Errors.NONE.code)
-            if (group.is(PreparingRebalance))
-              rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
-          }
-        }
-      }
-    }
-  }
-
-  def handleHeartbeat(groupId: String,
-                      consumerId: String,
-                      generationId: Int,
-                      responseCallback: Short => Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
-    } else {
-      val group = coordinatorMetadata.getGroup(groupId)
-      if (group == null) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; this is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-        // joining without specified consumer id,
-        responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else if (!group.has(consumerId)) {
-            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else if (generationId != group.generationId) {
-            responseCallback(Errors.ILLEGAL_GENERATION.code)
-          } else if (!group.is(Stable)) {
-            responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
-          } else {
-            val consumer = group.get(consumerId)
-            completeAndScheduleNextHeartbeatExpiration(group, consumer)
-            responseCallback(Errors.NONE.code)
-          }
-        }
-      }
-    }
-  }
-
-  def handleCommitOffsets(groupId: String,
-                          consumerId: String,
-                          generationId: Int,
-                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
-    if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code))
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code))
-    } else {
-      val group = coordinatorMetadata.getGroup(groupId)
-      if (group == null) {
-        if (generationId < 0)
-          // the group is not relying on Kafka for partition management, so allow the commit
-          offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
-        else
-          // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
-          // or this is a request coming from an older generation. either way, reject the commit
-          responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
-          } else if (!group.has(consumerId)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
-          } else if (generationId != group.generationId) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
-          } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code))
-          } else {
-            offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
-          }
-        }
-      }
-    }
-  }
-
-  def handleFetchOffsets(groupId: String,
-                         partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
-    if (!isActive.get) {
-      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
-    } else if (!isCoordinatorForGroup(groupId)) {
-      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
-    } else {
-      val group = coordinatorMetadata.getGroup(groupId)
-      if (group == null) {
-        // if the group does not exist, it means this group is not relying
-        // on Kafka for partition management, and hence never send join-group
-        // request to the coordinator before; in this case blindly fetch the offsets
-        offsetManager.getOffsets(groupId, partitions)
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap
-          } else {
-            offsetManager.getOffsets(groupId, partitions)
-          }
-        }
-      }
-    }
-  }
-
-  def handleGroupImmigration(offsetTopicPartitionId: Int) = {
-    // TODO we may need to add more logic in KAFKA-2017
-    offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
-  }
-
-  def handleGroupEmigration(offsetTopicPartitionId: Int) = {
-    // TODO we may need to add more logic in KAFKA-2017
-    offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
-  }
-
-  /**
-   * Complete existing DelayedHeartbeats for the given consumer and schedule the next one
-   */
-  private def completeAndScheduleNextHeartbeatExpiration(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
-    // complete current heartbeat expectation
-    consumer.latestHeartbeat = SystemTime.milliseconds
-    val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
-    heartbeatPurgatory.checkAndComplete(consumerKey)
-
-    // reschedule the next heartbeat expiration deadline
-    val newHeartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, newHeartbeatDeadline, consumer.sessionTimeoutMs)
-    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
-  }
-
-  private def removeHeartbeatForLeavingConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
-    consumer.isLeaving = true
-    val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
-    heartbeatPurgatory.checkAndComplete(consumerKey)
-  }
-
-  private def addConsumer(consumerId: String,
-                          topics: Set[String],
-                          sessionTimeoutMs: Int,
-                          group: ConsumerGroupMetadata) = {
-    val consumer = new ConsumerMetadata(consumerId, group.groupId, topics, sessionTimeoutMs)
-    val topicsToBind = topics -- group.topics
-    group.add(consumer.consumerId, consumer)
-    coordinatorMetadata.bindGroupToTopics(group.groupId, topicsToBind)
-    consumer
-  }
-
-  private def removeConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
-    group.remove(consumer.consumerId)
-    val topicsToUnbind = consumer.topics -- group.topics
-    coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind)
-  }
-
-  private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) {
-    val topicsToBind = topics -- group.topics
-    group.remove(consumer.consumerId)
-    val topicsToUnbind = consumer.topics -- (group.topics ++ topics)
-    group.add(consumer.consumerId, consumer)
-    consumer.topics = topics
-    coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind)
-  }
-
-  private def maybePrepareRebalance(group: ConsumerGroupMetadata) {
-    group synchronized {
-      if (group.canRebalance)
-        prepareRebalance(group)
-    }
-  }
-
-  private def prepareRebalance(group: ConsumerGroupMetadata) {
-    group.transitionTo(PreparingRebalance)
-    info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId))
-
-    val rebalanceTimeout = group.rebalanceTimeout
-    val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout)
-    val consumerGroupKey = ConsumerGroupKey(group.groupId)
-    rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
-  }
-
-  private def rebalance(group: ConsumerGroupMetadata) {
-    assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
-
-    group.transitionTo(Rebalancing)
-    group.generationId += 1
-
-    info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId))
-
-    val assignedPartitionsPerConsumer = reassignPartitions(group)
-    trace("Rebalance for group %s generation %s has assigned partitions: %s"
-          .format(group.groupId, group.generationId, assignedPartitionsPerConsumer))
-
-    group.transitionTo(Stable)
-    info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
-  }
-
-  private def onConsumerFailure(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
-    trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId))
-    removeConsumer(group, consumer)
-    maybePrepareRebalance(group)
-  }
-
-  private def reassignPartitions(group: ConsumerGroupMetadata) = {
-    val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy)
-    val topicsPerConsumer = group.topicsPerConsumer
-    val partitionsPerTopic = coordinatorMetadata.partitionsPerTopic
-    val assignedPartitionsPerConsumer = assignor.assign(topicsPerConsumer, partitionsPerTopic)
-    assignedPartitionsPerConsumer.foreach { case (consumerId, partitions) =>
-      group.get(consumerId).assignedTopicPartitions = partitions
-    }
-    assignedPartitionsPerConsumer
-  }
-
-  def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = {
-    group synchronized {
-      if (group.notYetRejoinedConsumers.isEmpty)
-        forceComplete()
-      else false
-    }
-  }
-
-  def onExpirationRebalance() {
-    // TODO: add metrics for rebalance timeouts
-  }
-
-  def onCompleteRebalance(group: ConsumerGroupMetadata) {
-    group synchronized {
-      val failedConsumers = group.notYetRejoinedConsumers
-      if (group.isEmpty || !failedConsumers.isEmpty) {
-        failedConsumers.foreach { failedConsumer =>
-          removeConsumer(group, failedConsumer)
-          // TODO: cut the socket connection to the consumer
-        }
-
-        if (group.isEmpty) {
-          group.transitionTo(Dead)
-          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
-          coordinatorMetadata.removeGroup(group.groupId, group.topics)
-        }
-      }
-      if (!group.is(Dead)) {
-        // assign partitions to existing consumers of the group according to the partitioning strategy
-        rebalance(group)
-
-        // trigger the awaiting join group response callback for all the consumers after rebalancing
-        for (consumer <- group.allConsumers) {
-          assert(consumer.awaitingRebalanceCallback != null)
-          consumer.awaitingRebalanceCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code)
-          consumer.awaitingRebalanceCallback = null
-          completeAndScheduleNextHeartbeatExpiration(group, consumer)
-        }
-      }
-    }
-  }
-
-  def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
-    group synchronized {
-      if (shouldKeepConsumerAlive(consumer, heartbeatDeadline) || consumer.isLeaving)
-        forceComplete()
-      else false
-    }
-  }
-
-  def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) {
-    group synchronized {
-      if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline))
-        onConsumerFailure(group, consumer)
-    }
-  }
-
-  def onCompleteHeartbeat() {
-    // TODO: add metrics for complete heartbeats
-  }
-
-  def partitionFor(group: String): Int = offsetManager.partitionFor(group)
-
-  private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) =
-    consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline
-
-  private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
-}
-
-object ConsumerCoordinator {
-
-  val OffsetsTopicName = "__consumer_offsets"
-
-  def create(config: KafkaConfig,
-             zkUtils: ZkUtils,
-             replicaManager: ReplicaManager,
-             kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
-    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
-      consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
-
-    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
-  }
-
-  def create(config: KafkaConfig,
-             zkUtils: ZkUtils,
-             offsetManager: OffsetManager): ConsumerCoordinator = {
-    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
-      consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
-
-    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils)
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
deleted file mode 100644
index 0e3657f..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.utils.nonthreadsafe
-
-import java.util.UUID
-
-import collection.mutable
-
-private[coordinator] sealed trait GroupState { def state: Byte }
-
-/**
- * Consumer group is preparing to rebalance
- *
- * action: respond to heartbeats with an ILLEGAL GENERATION error code
- * transition: some consumers have joined by the timeout => Rebalancing
- *             all consumers have left the group => Dead
- */
-private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
-
-/**
- * Consumer group is rebalancing
- *
- * action: compute the group's partition assignment
- *         send the join-group response with new partition assignment when rebalance is complete
- * transition: partition assignment has been computed => Stable
- */
-private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 }
-
-/**
- * Consumer group is stable
- *
- * action: respond to consumer heartbeats normally
- * transition: consumer failure detected via heartbeat => PreparingRebalance
- *             consumer join-group received => PreparingRebalance
- *             zookeeper topic watcher fired => PreparingRebalance
- */
-private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
-
-/**
- * Consumer group has no more members
- *
- * action: none
- * transition: none
- */
-private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
-
-
-private object ConsumerGroupMetadata {
-  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(PreparingRebalance),
-      Stable -> Set(Rebalancing),
-      PreparingRebalance -> Set(Stable),
-      Rebalancing -> Set(PreparingRebalance))
-}
-
-/**
- * Group contains the following metadata:
- *
- *  Membership metadata:
- *  1. Consumers registered in this group
- *  2. Partition assignment strategy for this group
- *
- *  State metadata:
- *  1. group state
- *  2. generation id
- */
-@nonthreadsafe
-private[coordinator] class ConsumerGroupMetadata(val groupId: String,
-                                                 val partitionAssignmentStrategy: String) {
-
-  private val consumers = new mutable.HashMap[String, ConsumerMetadata]
-  private var state: GroupState = Stable
-  var generationId = 0
-
-  def is(groupState: GroupState) = state == groupState
-  def has(consumerId: String) = consumers.contains(consumerId)
-  def get(consumerId: String) = consumers(consumerId)
-
-  def add(consumerId: String, consumer: ConsumerMetadata) {
-    consumers.put(consumerId, consumer)
-  }
-
-  def remove(consumerId: String) {
-    consumers.remove(consumerId)
-  }
-
-  def isEmpty = consumers.isEmpty
-
-  def topicsPerConsumer = consumers.mapValues(_.topics).toMap
-
-  def topics = consumers.values.flatMap(_.topics).toSet
-
-  def notYetRejoinedConsumers = consumers.values.filter(_.awaitingRebalanceCallback == null).toList
-
-  def allConsumers = consumers.values.toList
-
-  def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) =>
-    timeout.max(consumer.sessionTimeoutMs)
-  }
-
-  // TODO: decide if ids should be predictable or random
-  def generateNextConsumerId = UUID.randomUUID().toString
-
-  def canRebalance = state == Stable
-
-  def transitionTo(groupState: GroupState) {
-    assertValidTransition(groupState)
-    state = groupState
-  }
-
-  private def assertValidTransition(targetState: GroupState) {
-    if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state))
-      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
-        .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
deleted file mode 100644
index 64ed4a5..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.utils.nonthreadsafe
-
-/**
- * Consumer metadata contains the following metadata:
- *
- * Heartbeat metadata:
- * 1. negotiated heartbeat session timeout
- * 2. timestamp of the latest heartbeat
- *
- * Subscription metadata:
- * 1. subscribed topics
- * 2. assigned partitions for the subscribed topics
- *
- * In addition, it also contains the following state information:
- *
- * 1. Awaiting rebalance callback: when the consumer group is in the prepare-rebalance state,
- *                                 its rebalance callback will be kept in the metadata if the
- *                                 consumer has sent the join group request
- */
-@nonthreadsafe
-private[coordinator] class ConsumerMetadata(val consumerId: String,
-                                            val groupId: String,
-                                            var topics: Set[String],
-                                            val sessionTimeoutMs: Int) {
-
-  var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null
-  var assignedTopicPartitions = Set.empty[TopicAndPartition]
-  var latestHeartbeat: Long = -1
-  var isLeaving: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index a33231a..2279924 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -17,11 +17,8 @@
 
 package kafka.coordinator
 
-import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{threadsafe, ZkUtils, Logging}
-import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.{ZkClient, IZkDataListener}
+import kafka.utils.threadsafe
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -32,9 +29,7 @@ import scala.collection.mutable
  * It delegates all group logic to the callers.
  */
 @threadsafe
-private[coordinator] class CoordinatorMetadata(brokerId: Int,
-                                               zkUtils: ZkUtils,
-                                               maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
+private[coordinator] class CoordinatorMetadata(brokerId: Int) {
 
   /**
    * NOTE: If a group lock and metadataLock are simultaneously needed,
@@ -45,24 +40,11 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
   /**
    * These should be guarded by metadataLock
    */
-  private val groups = new mutable.HashMap[String, ConsumerGroupMetadata]
-  private val groupsPerTopic = new mutable.HashMap[String, Set[String]]
-  private val topicPartitionCounts = new mutable.HashMap[String, Int]
-  private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener]
+  private val groups = new mutable.HashMap[String, GroupMetadata]
 
   def shutdown() {
     inWriteLock(metadataLock) {
-      topicPartitionChangeListeners.keys.foreach(deregisterTopicPartitionChangeListener)
-      topicPartitionChangeListeners.clear()
       groups.clear()
-      groupsPerTopic.clear()
-      topicPartitionCounts.clear()
-    }
-  }
-
-  def partitionsPerTopic = {
-    inReadLock(metadataLock) {
-      topicPartitionCounts.toMap
     }
   }
 
@@ -78,148 +60,22 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
   /**
    * Add a group or get the group associated with the given groupId if it already exists
    */
-  def addGroup(groupId: String, partitionAssignmentStrategy: String) = {
+  def addGroup(groupId: String, protocolType: String) = {
     inWriteLock(metadataLock) {
-      groups.getOrElseUpdate(groupId, new ConsumerGroupMetadata(groupId, partitionAssignmentStrategy))
+      groups.getOrElseUpdate(groupId, new GroupMetadata(groupId, protocolType))
     }
   }
 
   /**
    * Remove all metadata associated with the group, including its topics
    * @param groupId the groupId of the group we are removing
-   * @param topicsForGroup topics that consumers in the group were subscribed to
    */
-  def removeGroup(groupId: String, topicsForGroup: Set[String]) {
+  def removeGroup(groupId: String) {
     inWriteLock(metadataLock) {
-      topicsForGroup.foreach(topic => unbindGroupFromTopics(groupId, topicsForGroup))
+      if (!groups.contains(groupId))
+        throw new IllegalArgumentException("Cannot remove non-existing group")
       groups.remove(groupId)
     }
   }
 
-  /**
-   * Add the given group to the set of groups interested in
-   * topic partition changes for the given topics
-   */
-  def bindGroupToTopics(groupId: String, topics: Set[String]) {
-    inWriteLock(metadataLock) {
-      require(groups.contains(groupId), "CoordinatorMetadata can only bind existing groups")
-      topics.foreach(topic => bindGroupToTopic(groupId, topic))
-    }
-  }
-
-  /**
-   * Remove the given group from the set of groups interested in
-   * topic partition changes for the given topics
-   */
-  def unbindGroupFromTopics(groupId: String, topics: Set[String]) {
-    inWriteLock(metadataLock) {
-      require(groups.contains(groupId), "CoordinatorMetadata can only unbind existing groups")
-      topics.foreach(topic => unbindGroupFromTopic(groupId, topic))
-    }
-  }
-
-  /**
-   * Add the given group to the set of groups interested in the topicsToBind and
-   * remove the given group from the set of groups interested in the topicsToUnbind
-   */
-  def bindAndUnbindGroupFromTopics(groupId: String, topicsToBind: Set[String], topicsToUnbind: Set[String]) {
-    inWriteLock(metadataLock) {
-      require(groups.contains(groupId), "CoordinatorMetadata can only update topic bindings for existing groups")
-      topicsToBind.foreach(topic => bindGroupToTopic(groupId, topic))
-      topicsToUnbind.foreach(topic => unbindGroupFromTopic(groupId, topic))
-    }
-  }
-
-  private def isListeningToTopic(topic: String) = topicPartitionChangeListeners.contains(topic)
-
-  private def bindGroupToTopic(groupId: String, topic: String) {
-    if (isListeningToTopic(topic)) {
-      val currentGroupsForTopic = groupsPerTopic(topic)
-      groupsPerTopic.put(topic, currentGroupsForTopic + groupId)
-    }
-    else {
-      groupsPerTopic.put(topic, Set(groupId))
-      topicPartitionCounts.put(topic, getTopicPartitionCountFromZK(topic))
-      registerTopicPartitionChangeListener(topic)
-    }
-  }
-
-  private def unbindGroupFromTopic(groupId: String, topic: String) {
-    if (isListeningToTopic(topic)) {
-      val remainingGroupsForTopic = groupsPerTopic(topic) - groupId
-      if (remainingGroupsForTopic.isEmpty) {
-        // no other group cares about the topic, so erase all metadata associated with the topic
-        groupsPerTopic.remove(topic)
-        topicPartitionCounts.remove(topic)
-        deregisterTopicPartitionChangeListener(topic)
-      } else {
-        groupsPerTopic.put(topic, remainingGroupsForTopic)
-      }
-    }
-  }
-
-  private def getTopicPartitionCountFromZK(topic: String) = {
-    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
-    topicData(topic).size
-  }
-
-  private def registerTopicPartitionChangeListener(topic: String) {
-    val listener = new TopicPartitionChangeListener
-    topicPartitionChangeListeners.put(topic, listener)
-    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener)
-  }
-
-  private def deregisterTopicPartitionChangeListener(topic: String) {
-    val listener = topicPartitionChangeListeners(topic)
-    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
-    topicPartitionChangeListeners.remove(topic)
-  }
-
-  /**
-   * Zookeeper listener to handle topic partition changes
-   */
-  class TopicPartitionChangeListener extends IZkDataListener with Logging {
-    this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: "
-
-    override def handleDataChange(dataPath: String, data: Object) {
-      info("Handling data change for path: %s data: %s".format(dataPath, data))
-      val topic = topicFromDataPath(dataPath)
-      val numPartitions = getTopicPartitionCountFromZK(topic)
-
-      val groupsToRebalance = inWriteLock(metadataLock) {
-        /*
-         * This condition exists because a consumer can leave and modify CoordinatorMetadata state
-         * while ZkClient begins handling the data change but before we acquire the metadataLock.
-         */
-        if (isListeningToTopic(topic)) {
-          topicPartitionCounts.put(topic, numPartitions)
-          groupsPerTopic(topic).map(groupId => groups(groupId))
-        }
-        else Set.empty[ConsumerGroupMetadata]
-      }
-      groupsToRebalance.foreach(maybePrepareRebalance)
-    }
-
-    override def handleDataDeleted(dataPath: String) {
-      info("Handling data delete for path: %s".format(dataPath))
-      val topic = topicFromDataPath(dataPath)
-      val groupsToRebalance = inWriteLock(metadataLock) {
-        /*
-         * This condition exists because a consumer can leave and modify CoordinatorMetadata state
-         * while ZkClient begins handling the data delete but before we acquire the metadataLock.
-         */
-        if (isListeningToTopic(topic)) {
-          topicPartitionCounts.put(topic, 0)
-          groupsPerTopic(topic).map(groupId => groups(groupId))
-        }
-        else Set.empty[ConsumerGroupMetadata]
-      }
-      groupsToRebalance.foreach(maybePrepareRebalance)
-    }
-
-    private def topicFromDataPath(dataPath: String) = {
-      val nodes = dataPath.split("/")
-      nodes.last
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 70a710c..8e250c3 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -23,13 +23,13 @@ import kafka.server.DelayedOperation
  * Delayed heartbeat operations that are added to the purgatory for session timeout checking.
  * Heartbeats are paused during rebalance.
  */
-private[coordinator] class DelayedHeartbeat(consumerCoordinator: ConsumerCoordinator,
-                                            group: ConsumerGroupMetadata,
-                                            consumer: ConsumerMetadata,
+private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
+                                            group: GroupMetadata,
+                                            member: MemberMetadata,
                                             heartbeatDeadline: Long,
                                             sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
-  override def tryComplete(): Boolean = consumerCoordinator.tryCompleteHeartbeat(group, consumer, heartbeatDeadline, forceComplete)
-  override def onExpiration() = consumerCoordinator.onExpirationHeartbeat(group, consumer, heartbeatDeadline)
-  override def onComplete() = consumerCoordinator.onCompleteHeartbeat()
+  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
+  override def onComplete() = coordinator.onCompleteHeartbeat()
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
new file mode 100644
index 0000000..ae96e15
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
+ *
+ * Whenever a join-group request is received, check if all known group members have requested
+ * to re-join the group; if yes, complete this operation to proceed rebalance.
+ *
+ * When the operation has expired, any known members that have not requested to re-join
+ * the group are marked as failed, and complete this operation to proceed rebalance with
+ * the rest of the group.
+ */
+private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
+                                            group: GroupMetadata,
+                                            sessionTimeout: Long)
+  extends DelayedOperation(sessionTimeout) {
+
+  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
+  override def onExpiration() = coordinator.onExpireJoin()
+  override def onComplete() = coordinator.onCompleteJoin(group)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
deleted file mode 100644
index 8247d33..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
- *
- * Whenever a join-group request is received, check if all known consumers have requested
- * to re-join the group; if yes, complete this operation to proceed rebalance.
- *
- * When the operation has expired, any known consumers that have not requested to re-join
- * the group are marked as failed, and complete this operation to proceed rebalance with
- * the rest of the group.
- */
-private[coordinator] class DelayedRebalance(consumerCoordinator: ConsumerCoordinator,
-                                            group: ConsumerGroupMetadata,
-                                            sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
-
-  override def tryComplete(): Boolean = consumerCoordinator.tryCompleteRebalance(group, forceComplete)
-  override def onExpiration() = consumerCoordinator.onExpirationRebalance()
-  override def onComplete() = consumerCoordinator.onCompleteRebalance(group)
-}