You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:47 UTC

[3/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
new file mode 100644
index 0000000..ef94289
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -0,0 +1,632 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
+import kafka.log.LogConfig
+import kafka.message.UncompressedCodec
+import kafka.server._
+import kafka.utils._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.JoinGroupRequest
+
+import scala.collection.{Map, Seq, immutable}
+
+case class GroupManagerConfig(groupMinSessionTimeoutMs: Int,
+                              groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+                           memberId: String,
+                           generationId: Int,
+                           subProtocol: String,
+                           leaderId: String,
+                           errorCode: Short)
+
+/**
+ * GroupCoordinator handles general group membership and offset management.
+ *
+ * Each Kafka server instantiates a coordinator which is responsible for a set of
+ * groups. Groups are assigned to coordinators based on their group names.
+ */
+class GroupCoordinator(val brokerId: Int,
+                       val groupConfig: GroupManagerConfig,
+                       val offsetConfig: OffsetManagerConfig,
+                       private val offsetManager: OffsetManager) extends Logging {
+  type JoinCallback = JoinGroupResult => Unit
+  type SyncCallback = (Array[Byte], Short) => Unit
+
+  this.logIdent = "[GroupCoordinator " + brokerId + "]: "
+
+  private val isActive = new AtomicBoolean(false)
+
+  private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
+  private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
+  private var coordinatorMetadata: CoordinatorMetadata = null
+
+  def this(brokerId: Int,
+           groupConfig: GroupManagerConfig,
+           offsetConfig: OffsetManagerConfig,
+           replicaManager: ReplicaManager,
+           zkUtils: ZkUtils,
+           scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
+    new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler))
+
+  def offsetsTopicConfigs: Properties = {
+    val props = new Properties
+    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
+    props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
+    props
+  }
+
+  /**
+   * NOTE: If a group lock and metadataLock are simultaneously needed,
+   * be sure to acquire the group lock before metadataLock to prevent deadlock
+   */
+
+  /**
+   * Startup logic executed at the same time when the server starts up.
+   */
+  def startup() {
+    info("Starting up.")
+    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
+    joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
+    coordinatorMetadata = new CoordinatorMetadata(brokerId)
+    isActive.set(true)
+    info("Startup complete.")
+  }
+
+  /**
+   * Shutdown logic executed at the same time when server shuts down.
+   * Ordering of actions should be reversed from the startup process.
+   */
+  def shutdown() {
+    info("Shutting down.")
+    isActive.set(false)
+    offsetManager.shutdown()
+    coordinatorMetadata.shutdown()
+    heartbeatPurgatory.shutdown()
+    joinPurgatory.shutdown()
+    info("Shutdown complete.")
+  }
+
+  def handleJoinGroup(groupId: String,
+                      memberId: String,
+                      sessionTimeoutMs: Int,
+                      protocolType: String,
+                      protocols: List[(String, Array[Byte])],
+                      responseCallback: JoinCallback) {
+    if (!isActive.get) {
+      responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code))
+    } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
+               sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
+      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
+    } else {
+      // only try to create the group if the group is not unknown AND
+      // the member id is UNKNOWN, if member is specified but group does not
+      // exist we should reject the request
+      var group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+        } else {
+          group = coordinatorMetadata.addGroup(groupId, protocolType)
+          doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+        }
+      } else {
+        doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+      }
+    }
+  }
+
+  private def doJoinGroup(group: GroupMetadata,
+                          memberId: String,
+                          sessionTimeoutMs: Int,
+                          protocolType: String,
+                          protocols: List[(String, Array[Byte])],
+                          responseCallback: JoinCallback) {
+    group synchronized {
+      if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
+        // if the new member does not support the group protocol, reject it
+        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
+      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
+        // if the member trying to register with a un-recognized id, send the response to let
+        // it reset its member id and retry
+        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+      } else {
+        group.currentState match {
+          case Dead =>
+            // if the group is marked as dead, it means some other thread has just removed the group
+            // from the coordinator metadata; this is likely that the group has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let the member retry
+            // joining without the specified member id,
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+
+          case PreparingRebalance =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              updateMemberAndRebalance(group, member, protocols, responseCallback)
+            }
+
+          case AwaitingSync =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              if (member.matches(protocols)) {
+                // member is joining with the same metadata (which could be because it failed to
+                // receive the initial JoinGroup response), so just return current group information
+                // for the current generation.
+                responseCallback(JoinGroupResult(
+                  members = if (memberId == group.leaderId) {
+                    group.currentMemberMetadata
+                  } else {
+                    Map.empty
+                  },
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocol,
+                  leaderId = group.leaderId,
+                  errorCode = Errors.NONE.code))
+              } else {
+                // member has changed metadata, so force a rebalance
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              }
+            }
+
+          case Stable =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              // if the member id is unknown, register the member to the group
+              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              if (memberId == group.leaderId || !member.matches(protocols)) {
+                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+                // The latter allows the leader to trigger rebalances for changes affecting assignment
+                // which do not affect the member metadata (such as topic metadata changes for the consumer)
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              } else {
+                // for followers with no actual change to their metadata, just return group information
+                // for the current generation which will allow them to issue SyncGroup
+                responseCallback(JoinGroupResult(
+                  members = Map.empty,
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocol,
+                  leaderId = group.leaderId,
+                  errorCode = Errors.NONE.code))
+              }
+            }
+        }
+
+        if (group.is(PreparingRebalance))
+          joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+      }
+    }
+  }
+
+  def handleSyncGroup(groupId: String,
+                      generation: Int,
+                      memberId: String,
+                      groupAssignment: Map[String, Array[Byte]],
+                      responseCallback: SyncCallback) {
+    if (!isActive.get) {
+      responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null)
+        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+      else
+        doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+    }
+  }
+
+  private def doSyncGroup(group: GroupMetadata,
+                          generationId: Int,
+                          memberId: String,
+                          groupAssignment: Map[String, Array[Byte]],
+                          responseCallback: SyncCallback) {
+    group synchronized {
+      if (!group.has(memberId)) {
+        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+      } else if (generationId != group.generationId) {
+        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
+      } else {
+        group.currentState match {
+          case Dead =>
+            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+
+          case PreparingRebalance =>
+            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
+
+          case AwaitingSync =>
+            group.get(memberId).awaitingSyncCallback = responseCallback
+            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+
+            // if this is the leader, then we can transition to stable and
+            // propagate the assignment to any awaiting members
+            if (memberId == group.leaderId) {
+              group.transitionTo(Stable)
+              propagateAssignment(group, groupAssignment)
+            }
+
+          case Stable =>
+            // if the group is stable, we just return the current assignment
+            val memberMetadata = group.get(memberId)
+            responseCallback(memberMetadata.assignment, Errors.NONE.code)
+            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+        }
+      }
+    }
+  }
+
+  def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
+    if (!isActive.get) {
+      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        // if the group is marked as dead, it means some other thread has just removed the group
+        // from the coordinator metadata; this is likely that the group has migrated to some other
+        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+        // joining without specified consumer id,
+        responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          } else if (!group.has(consumerId)) {
+            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          } else {
+            val member = group.get(consumerId)
+            removeHeartbeatForLeavingMember(group, member)
+            onMemberFailure(group, member)
+            responseCallback(Errors.NONE.code)
+          }
+        }
+      }
+    }
+  }
+
+  def handleHeartbeat(groupId: String,
+                      memberId: String,
+                      generationId: Int,
+                      responseCallback: Short => Unit) {
+    if (!isActive.get) {
+      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        // if the group is marked as dead, it means some other thread has just removed the group
+        // from the coordinator metadata; this is likely that the group has migrated to some other
+        // coordinator OR the group is in a transient unstable phase. Let the member retry
+        // joining without the specified member id,
+        responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          } else if (!group.is(Stable)) {
+            responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+          } else if (!group.has(memberId)) {
+            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          } else if (generationId != group.generationId) {
+            responseCallback(Errors.ILLEGAL_GENERATION.code)
+          } else {
+            val member = group.get(memberId)
+            completeAndScheduleNextHeartbeatExpiration(group, member)
+            responseCallback(Errors.NONE.code)
+          }
+        }
+      }
+    }
+  }
+
+  def handleCommitOffsets(groupId: String,
+                          memberId: String,
+                          generationId: Int,
+                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+    if (!isActive.get) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        if (generationId < 0)
+          // the group is not relying on Kafka for partition management, so allow the commit
+          offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+        else
+          // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
+          // or this is a request coming from an older generation. either way, reject the commit
+          responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+          } else if (group.is(AwaitingSync)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
+          } else if (!group.has(memberId)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+          } else if (generationId != group.generationId) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+          } else {
+            offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+          }
+        }
+      }
+    }
+  }
+
+  def handleFetchOffsets(groupId: String,
+                         partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+    if (!isActive.get) {
+      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap
+    } else if (!isCoordinatorForGroup(groupId)) {
+      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+    } else {
+      // return offsets blindly regardless the current group state since the group may be using
+      // Kafka commit storage without automatic group management
+      offsetManager.getOffsets(groupId, partitions)
+    }
+  }
+
+  def handleGroupImmigration(offsetTopicPartitionId: Int) = {
+    // TODO we may need to add more logic in KAFKA-2017
+    offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
+  }
+
+  def handleGroupEmigration(offsetTopicPartitionId: Int) = {
+    // TODO we may need to add more logic in KAFKA-2017
+    offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
+  }
+
+  private def joinError(memberId: String, errorCode: Short): JoinGroupResult = {
+    JoinGroupResult(
+      members=Map.empty,
+      memberId=memberId,
+      generationId=0,
+      subProtocol=GroupCoordinator.NoProtocol,
+      leaderId=GroupCoordinator.NoLeader,
+      errorCode=errorCode)
+  }
+
+  private def propagateAssignment(group: GroupMetadata,
+                                  assignment: Map[String, Array[Byte]]) {
+    for (member <- group.allMembers) {
+      member.assignment = assignment.getOrElse(member.memberId, Array.empty[Byte])
+      if (member.awaitingSyncCallback != null) {
+        member.awaitingSyncCallback(member.assignment, Errors.NONE.code)
+        member.awaitingSyncCallback = null
+      }
+    }
+  }
+
+  /**
+   * Complete existing DelayedHeartbeats for the given member and schedule the next one
+   */
+  private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
+    // complete current heartbeat expectation
+    member.latestHeartbeat = SystemTime.milliseconds
+    val memberKey = ConsumerKey(member.groupId, member.memberId)
+    heartbeatPurgatory.checkAndComplete(memberKey)
+
+    // reschedule the next heartbeat expiration deadline
+    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
+    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
+  }
+
+  private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
+    member.isLeaving = true
+    val consumerKey = ConsumerKey(member.groupId, member.memberId)
+    heartbeatPurgatory.checkAndComplete(consumerKey)
+  }
+
+  private def addMemberAndRebalance(sessionTimeoutMs: Int,
+                                    protocols: List[(String, Array[Byte])],
+                                    group: GroupMetadata,
+                                    callback: JoinCallback) = {
+    val memberId = group.generateNextMemberId
+    val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+    member.awaitingJoinCallback = callback
+    group.add(member.memberId, member)
+    maybePrepareRebalance(group)
+    member
+  }
+
+  private def updateMemberAndRebalance(group: GroupMetadata,
+                                       member: MemberMetadata,
+                                       protocols: List[(String, Array[Byte])],
+                                       callback: JoinCallback) {
+    member.supportedProtocols = protocols
+    member.awaitingJoinCallback = callback
+    maybePrepareRebalance(group)
+  }
+
+  private def maybePrepareRebalance(group: GroupMetadata) {
+    group synchronized {
+      if (group.canRebalance)
+        prepareRebalance(group)
+    }
+  }
+
+  private def prepareRebalance(group: GroupMetadata) {
+    // if any members are awaiting sync, cancel their request and have them rejoin
+    if (group.is(AwaitingSync)) {
+      for (member <- group.allMembers) {
+        if (member.awaitingSyncCallback != null) {
+          member.awaitingSyncCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
+          member.awaitingSyncCallback = null
+        }
+      }
+    }
+
+    group.allMembers.foreach(_.assignment = null)
+    group.transitionTo(PreparingRebalance)
+    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+    val rebalanceTimeout = group.rebalanceTimeout
+    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
+    val consumerGroupKey = ConsumerGroupKey(group.groupId)
+    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
+  }
+
+  private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
+    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+    group.remove(member.memberId)
+    group.currentState match {
+      case Dead =>
+      case Stable | AwaitingSync => maybePrepareRebalance(group)
+      case PreparingRebalance => joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+    }
+  }
+
+  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
+    group synchronized {
+      if (group.notYetRejoinedMembers.isEmpty)
+        forceComplete()
+      else false
+    }
+  }
+
+  def onExpireJoin() {
+    // TODO: add metrics for restabilize timeouts
+  }
+
+  def onCompleteJoin(group: GroupMetadata) {
+    group synchronized {
+      val failedMembers = group.notYetRejoinedMembers
+      if (group.isEmpty || !failedMembers.isEmpty) {
+        failedMembers.foreach { failedMember =>
+          group.remove(failedMember.memberId)
+          // TODO: cut the socket connection to the client
+        }
+
+        if (group.isEmpty) {
+          group.transitionTo(Dead)
+          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+          coordinatorMetadata.removeGroup(group.groupId)
+        }
+      }
+      if (!group.is(Dead)) {
+        group.initNextGeneration
+        info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
+
+        // trigger the awaiting join group response callback for all the members after rebalancing
+        for (member <- group.allMembers) {
+          assert(member.awaitingJoinCallback != null)
+          val joinResult = JoinGroupResult(
+            members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+            memberId=member.memberId,
+            generationId=group.generationId,
+            subProtocol=group.protocol,
+            leaderId=group.leaderId,
+            errorCode=Errors.NONE.code)
+
+          member.awaitingJoinCallback(joinResult)
+          member.awaitingJoinCallback = null
+          completeAndScheduleNextHeartbeatExpiration(group, member)
+        }
+      }
+    }
+  }
+
+  def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+    group synchronized {
+      if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+        forceComplete()
+      else false
+    }
+  }
+
+  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
+    group synchronized {
+      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
+        onMemberFailure(group, member)
+    }
+  }
+
+  def onCompleteHeartbeat() {
+    // TODO: add metrics for complete heartbeats
+  }
+
+  def partitionFor(group: String): Int = offsetManager.partitionFor(group)
+
+  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
+    member.awaitingJoinCallback != null ||
+      member.awaitingSyncCallback != null ||
+      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
+
+  private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
+}
+
+object GroupCoordinator {
+
+  val NoProtocol = ""
+  val NoLeader = ""
+  val OffsetsTopicName = "__consumer_offsets"
+
+  def create(config: KafkaConfig,
+             zkUtils: ZkUtils,
+             replicaManager: ReplicaManager,
+             kafkaScheduler: KafkaScheduler): GroupCoordinator = {
+    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
+  }
+
+  def create(config: KafkaConfig,
+             zkUtils: ZkUtils,
+             offsetManager: OffsetManager): GroupCoordinator = {
+    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
new file mode 100644
index 0000000..60ee987
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+import java.util.UUID
+
+import org.apache.kafka.common.protocol.Errors
+
+import collection.mutable
+
+private[coordinator] sealed trait GroupState { def state: Byte }
+
+/**
+ * Group is preparing to rebalance
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ *         respond to sync group with REBALANCE_IN_PROGRESS
+ *         remove member on leave group request
+ *         park join group requests from new or existing members until all expected members have joined
+ *         allow offset commits from previous generation
+ *         allow offset fetch requests
+ * transition: some members have joined by the timeout => AwaitingSync
+ *             all members have left the group => Dead
+ */
+private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
+
+/**
+ * Group is awaiting state assignment from the leader
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ *         respond to offset commits with REBALANCE_IN_PROGRESS
+ *         park sync group requests from followers until transition to Stable
+ *         allow offset fetch requests
+ * transition: sync group with state assignment received from leader => Stable
+ *             join group from new member or existing member with updated metadata => PreparingRebalance
+ *             leave group from existing member => PreparingRebalance
+ *             member failure detected => PreparingRebalance
+ */
+private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
+
+/**
+ * Group is stable
+ *
+ * action: respond to member heartbeats normally
+ *         respond to sync group from any member with current assignment
+ *         respond to join group from followers with matching metadata with current group metadata
+ *         allow offset commits from member of current generation
+ *         allow offset fetch requests
+ * transition: member failure detected via heartbeat => PreparingRebalance
+ *             leave group from existing member => PreparingRebalance
+ *             leader join-group received => PreparingRebalance
+ *             follower join-group with new metadata => PreparingRebalance
+ */
+private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
+
+/**
+ * Group has no more members
+ *
+ * action: respond to join group with UNKNOWN_MEMBER_ID
+ *         respond to sync group with UNKNOWN_MEMBER_ID
+ *         respond to heartbeat with UNKNOWN_MEMBER_ID
+ *         respond to leave group with UNKNOWN_MEMBER_ID
+ *         respond to offset commit with UNKNOWN_MEMBER_ID
+ *         allow offset fetch requests
+ * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
+ */
+private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
+
+
+private object GroupMetadata {
+  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+    Map(Dead -> Set(PreparingRebalance),
+      AwaitingSync -> Set(PreparingRebalance),
+      Stable -> Set(AwaitingSync),
+      PreparingRebalance -> Set(Stable, AwaitingSync))
+}
+
+/**
+ * Group contains the following metadata:
+ *
+ *  Membership metadata:
+ *  1. Members registered in this group
+ *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
+ *  3. Protocol metadata associated with group members
+ *
+ *  State metadata:
+ *  1. group state
+ *  2. generation id
+ *  3. leader id
+ */
+@nonthreadsafe
+private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
+
+  private val members = new mutable.HashMap[String, MemberMetadata]
+  private var state: GroupState = Stable
+  var generationId = 0
+  var leaderId: String = null
+  var protocol: String = null
+
+  def is(groupState: GroupState) = state == groupState
+  def not(groupState: GroupState) = state != groupState
+  def has(memberId: String) = members.contains(memberId)
+  def get(memberId: String) = members(memberId)
+
+  def add(memberId: String, member: MemberMetadata) {
+    assert(supportsProtocols(member.protocols))
+
+    if (leaderId == null)
+      leaderId = memberId
+    members.put(memberId, member)
+  }
+
+  def remove(memberId: String) {
+    members.remove(memberId)
+    if (memberId == leaderId) {
+      leaderId = if (members.isEmpty) {
+        null
+      } else {
+        members.keys.head
+      }
+    }
+  }
+
+  def currentState = state
+
+  def isEmpty = members.isEmpty
+
+  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
+
+  def allMembers = members.values.toList
+
+  def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
+    timeout.max(member.sessionTimeoutMs)
+  }
+
+  // TODO: decide if ids should be predictable or random
+  def generateNextMemberId = UUID.randomUUID().toString
+
+  def canRebalance = state == Stable || state == AwaitingSync
+
+  def transitionTo(groupState: GroupState) {
+    assertValidTransition(groupState)
+    state = groupState
+  }
+
+  def selectProtocol: String = {
+    if (members.isEmpty)
+      throw new IllegalStateException("Cannot select protocol for empty group")
+
+    // select the protocol for this group which is supported by all members
+    val candidates = candidateProtocols
+
+    // let each member vote for one of the protocols and choose the one with the most votes
+    val votes: List[(String, Int)] = allMembers
+      .map(_.vote(candidates))
+      .groupBy(identity)
+      .mapValues(_.size)
+      .toList
+
+    votes.maxBy(_._2)._1
+  }
+
+  private def candidateProtocols = {
+    // get the set of protocols that are commonly supported by all members
+    allMembers
+      .map(_.protocols)
+      .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+  }
+
+  def supportsProtocols(memberProtocols: Set[String]) = {
+    isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+  }
+
+  def initNextGeneration = {
+    assert(notYetRejoinedMembers == List.empty[MemberMetadata])
+    generationId += 1
+    protocol = selectProtocol
+    transitionTo(AwaitingSync)
+  }
+
+  def currentMemberMetadata: Map[String, Array[Byte]] = {
+    if (is(Dead) || is(PreparingRebalance))
+      throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
+    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+  }
+
+  private def assertValidTransition(targetState: GroupState) {
+    if (!GroupMetadata.validPreviousStates(targetState).contains(state))
+      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
+        .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
new file mode 100644
index 0000000..7f7df9a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util
+
+import kafka.utils.nonthreadsafe
+
+import scala.collection.Map
+
+/**
+ * Member metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+@nonthreadsafe
+private[coordinator] class MemberMetadata(val memberId: String,
+                                          val groupId: String,
+                                          val sessionTimeoutMs: Int,
+                                          var supportedProtocols: List[(String, Array[Byte])]) {
+
+  var assignment: Array[Byte] = null
+  var awaitingJoinCallback: JoinGroupResult => Unit = null
+  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+  var latestHeartbeat: Long = -1
+  var isLeaving: Boolean = false
+
+  def protocols = supportedProtocols.map(_._1).toSet
+
+  /**
+   * Get metadata corresponding to the provided protocol.
+   */
+  def metadata(protocol: String): Array[Byte] = {
+    supportedProtocols.find(_._1 == protocol) match {
+      case Some((_, metadata)) => metadata
+      case None =>
+        throw new IllegalArgumentException("Member does not support protocol")
+    }
+  }
+
+  /**
+   * Check if the provided protocol metadata matches the currently stored metadata.
+   */
+  def matches(protocols: List[(String, Array[Byte])]): Boolean = {
+    if (protocols.size != this.supportedProtocols.size)
+      return false
+
+    for (i <- 0 until protocols.size) {
+      val p1 = protocols(i)
+      val p2 = supportedProtocols(i)
+      if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
+        return false
+    }
+    return true
+  }
+
+  /**
+   * Vote for one of the potential group protocols. This takes into account the protocol preference as
+   * indicated by the order of supported protocols and returns the first one also contained in the set
+   */
+  def vote(candidates: Set[String]): String = {
+    supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
+      case Some((protocol, _)) => protocol
+      case None =>
+        throw new IllegalArgumentException("Member does not support any of the candidate protocols")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
deleted file mode 100644
index 8499bf8..0000000
--- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.utils.CoreUtils
-
-private[coordinator] trait PartitionAssignor {
-  /**
-   * Assigns partitions to consumers in a group.
-   * @return A mapping from consumer to assigned partitions.
-   */
-  def assign(topicsPerConsumer: Map[String, Set[String]],
-             partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]]
-
-  protected def fill[K, V](vsPerK: Map[K, Set[V]], expectedKs: Set[K]): Map[K, Set[V]] = {
-    val unfilledKs = expectedKs -- vsPerK.keySet
-    vsPerK ++ unfilledKs.map(k => (k, Set.empty[V]))
-  }
-
-  protected def aggregate[K, V](pairs: Seq[(K, V)]): Map[K, Set[V]] = {
-    pairs
-      .groupBy { case (k, v) => k }
-      .map { case (k, kvPairs) => (k, kvPairs.map(_._2).toSet) }
-  }
-
-  protected def invert[K, V](vsPerK: Map[K, Set[V]]): Map[V, Set[K]] = {
-    val vkPairs = vsPerK.toSeq.flatMap { case (k, vs) => vs.map(v => (v, k)) }
-    aggregate(vkPairs)
-  }
-}
-
-private[coordinator] object PartitionAssignor {
-  val strategies = Set("range", "roundrobin")
-
-  def createInstance(strategy: String) = strategy match {
-    case "roundrobin" => new RoundRobinAssignor()
-    case _ => new RangeAssignor()
-  }
-}
-
-/**
- * The roundrobin assignor lays out all the available partitions and all the available consumers. It
- * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer
- * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
- * will be within a delta of exactly one across all consumers.)
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
- * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0 -> [t0p0, t0p2, t1p1]
- * C1 -> [t0p1, t1p0, t1p2]
- */
-private[coordinator] class RoundRobinAssignor extends PartitionAssignor {
-  override def assign(topicsPerConsumer: Map[String, Set[String]],
-                      partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
-    val consumers = topicsPerConsumer.keys.toSeq.sorted
-    val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted
-
-    val allTopicPartitions = topics.flatMap { topic =>
-      val numPartitionsForTopic = partitionsPerTopic(topic)
-      (0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition))
-    }
-
-    var consumerAssignor = CoreUtils.circularIterator(consumers)
-    val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition =>
-      consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic))
-      val consumer = consumerAssignor.next()
-      (consumer, topicAndPartition)
-    }
-    fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet)
-  }
-}
-
-/**
- * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
- * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
- * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
- * divide, then the first few consumers will have one extra partition.
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
- * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0 -> [t0p0, t0p1, t1p0, t1p1]
- * C1 -> [t0p2, t1p2]
- */
-private[coordinator] class RangeAssignor extends PartitionAssignor {
-  override def assign(topicsPerConsumer: Map[String, Set[String]],
-                      partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
-    val consumersPerTopic = invert(topicsPerConsumer)
-    val consumerPartitionPairs = consumersPerTopic.toSeq.flatMap { case (topic, consumersForTopic) =>
-      val numPartitionsForTopic = partitionsPerTopic(topic)
-
-      val numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size
-      val consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size
-
-      consumersForTopic.toSeq.sorted.zipWithIndex.flatMap { case (consumerForTopic, consumerIndex) =>
-        val startPartition = numPartitionsPerConsumer * consumerIndex + consumerIndex.min(consumersWithExtraPartition)
-        val numPartitions = numPartitionsPerConsumer + (if (consumerIndex + 1 > consumersWithExtraPartition) 0 else 1)
-
-        // The first few consumers pick up an extra partition, if any.
-        (startPartition until startPartition + numPartitions)
-          .map(partition => (consumerForTopic, TopicAndPartition(topic, partition)))
-      }
-    }
-    fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet)
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
deleted file mode 100644
index 4345a8e..0000000
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
-
-  def errorCode = underlying.errorCode
-
-  def coordinator: BrokerEndPoint = {
-    import kafka.javaapi.Implicits._
-    underlying.coordinatorOpt
-  }
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
-    this.underlying.equals(otherConsumerMetadataResponse.underlying)
-  }
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
-
-  override def hashCode = underlying.hashCode
-
-  override def toString = underlying.toString
-
-}
-
-object ConsumerMetadataResponse {
-  def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
new file mode 100644
index 0000000..b94aa01
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
+
+  def errorCode = underlying.errorCode
+
+  def coordinator: BrokerEndPoint = {
+    import kafka.javaapi.Implicits._
+    underlying.coordinatorOpt
+  }
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
+    this.underlying.equals(otherConsumerMetadataResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
+
+  override def hashCode = underlying.hashCode
+
+  override def toString = underlying.toString
+
+}
+
+object GroupMetadataResponse {
+  def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 3b8312d..ceb6348 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -33,8 +33,8 @@ case object Topic extends ResourceType {
   val name = "Topic"
 }
 
-case object ConsumerGroup extends ResourceType {
-  val name = "ConsumerGroup"
+case object Group extends ResourceType {
+  val name = "Group"
 }
 
 
@@ -45,5 +45,5 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup)
+  def values: Seq[ResourceType] = List(Cluster, Topic, Group)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6acab8d..c80bd46 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,24 +17,26 @@
 
 package kafka.server
 
-import kafka.message.MessageSet
-import kafka.security.auth.Topic
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.TopicPartition
-import kafka.api._
+import java.nio.ByteBuffer
+
 import kafka.admin.AdminUtils
+import kafka.api._
 import kafka.common._
 import kafka.controller.KafkaController
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
+import kafka.message.MessageSet
 import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
-import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend}
-import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
-import scala.collection._
+import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
+import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
-import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Describe, Resource, Topic, Operation, ConsumerGroup}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection._
 
 
 /**
@@ -42,7 +44,7 @@ import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Desc
  */
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
-                val coordinator: ConsumerCoordinator,
+                val coordinator: GroupCoordinator,
                 val controller: KafkaController,
                 val zkUtils: ZkUtils,
                 val brokerId: Int,
@@ -73,10 +75,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
-        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+        case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
+        case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -114,12 +117,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       // for each new leader or follower, call coordinator to handle
       // consumer group migration
       result.updatedLeaders.foreach { case partition =>
-        if (partition.topic == ConsumerCoordinator.OffsetsTopicName)
+        if (partition.topic == GroupCoordinator.OffsetsTopicName)
           coordinator.handleGroupImmigration(partition.partitionId)
       }
       result.updatedFollowers.foreach { case partition =>
         partition.leaderReplicaIdOpt.foreach { leaderReplica =>
-          if (partition.topic == ConsumerCoordinator.OffsetsTopicName &&
+          if (partition.topic == GroupCoordinator.OffsetsTopicName &&
               leaderReplica == brokerId)
             coordinator.handleGroupEmigration(partition.partitionId)
         }
@@ -188,7 +191,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
       case (topicAndPartition, offsetMetadata) =>
         authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
-          authorize(request.session, Read, new Resource(ConsumerGroup, offsetCommitRequest.groupId))
+          authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))
     }
 
     // the callback for sending an offset commit response
@@ -268,7 +271,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // call coordinator to handle commit offset
       coordinator.handleCommitOffsets(
         offsetCommitRequest.groupId,
-        offsetCommitRequest.consumerId,
+        offsetCommitRequest.memberId,
         offsetCommitRequest.groupGenerationId,
         offsetData,
         sendResponseCallback)
@@ -526,9 +529,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (topics.size > 0 && topicResponses.size != topics.size) {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
+        if (topic == GroupCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
           try {
-            if (topic == ConsumerCoordinator.OffsetsTopicName) {
+            if (topic == GroupCoordinator.OffsetsTopicName) {
               val aliveBrokers = metadataCache.getAliveBrokers
               val offsetsTopicReplicationFactor =
                 if (aliveBrokers.length > 0)
@@ -610,7 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
       authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
-        authorize(request.session, Read, new Resource(ConsumerGroup, offsetFetchRequest.groupId))
+        authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))
     }
 
     val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
@@ -659,29 +662,29 @@ class KafkaApis(val requestChannel: RequestChannel,
   /*
    * Handle a consumer metadata request
    */
-  def handleConsumerMetadataRequest(request: RequestChannel.Request) {
-    val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
+  def handleGroupMetadataRequest(request: RequestChannel.Request) {
+    val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
 
-    if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group))) {
-      val response = ConsumerMetadataResponse(None, ErrorMapping.AuthorizationCode, consumerMetadataRequest.correlationId)
+    if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
+      val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
     } else {
-      val partition = coordinator.partitionFor(consumerMetadataRequest.group)
+      val partition = coordinator.partitionFor(groupMetadataRequest.group)
 
-      //get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head
+      // get metadata (and create the topic if necessary)
+      val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.OffsetsTopicName), request.securityProtocol).head
 
-      val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
+      val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
 
       val response =
         offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
           partitionMetadata.leader.map { leader =>
-            ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
+            GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
           }.getOrElse(errorResponse)
         }.getOrElse(errorResponse)
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
-        .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
+        .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
     }
   }
@@ -690,39 +693,65 @@ class KafkaApis(val requestChannel: RequestChannel,
     import JavaConversions._
 
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
-    val respHeader = new ResponseHeader(request.header.correlationId)
+    val responseHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a join-group response
-    def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) {
-      val partitionList = if (errorCode == ErrorMapping.NoError)
-        partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
-      else
-        List.empty.toBuffer
-
-      val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList)
-
+    def sendResponseCallback(joinResult: JoinGroupResult) {
+      val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
+      val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
+        joinResult.memberId, joinResult.leaderId, members)
       trace("Sending join group response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     }
 
-    // ensure that the client is authorized to join the group and read from all subscribed topics
-    if (!authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId())) ||
-        joinGroupRequest.topics().exists(topic => !authorize(request.session, Read, new Resource(Topic, topic)))) {
-      val responseBody = new JoinGroupResponse(ErrorMapping.AuthorizationCode, 0, joinGroupRequest.consumerId(), List.empty[TopicPartition])
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+    if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
+      val responseBody = new JoinGroupResponse(
+        ErrorMapping.AuthorizationCode,
+        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+        JoinGroupResponse.UNKNOWN_PROTOCOL,
+        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+        Map.empty[String, ByteBuffer])
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
       // let the coordinator to handle join-group
+      val protocols = joinGroupRequest.groupProtocols().map(protocol =>
+        (protocol.name, Utils.toArray(protocol.metadata))).toList
       coordinator.handleJoinGroup(
         joinGroupRequest.groupId(),
-        joinGroupRequest.consumerId(),
-        joinGroupRequest.topics().toSet,
+        joinGroupRequest.memberId(),
         joinGroupRequest.sessionTimeout(),
-        joinGroupRequest.strategy(),
+        joinGroupRequest.protocolType(),
+        protocols,
         sendResponseCallback)
     }
   }
 
+  def handleSyncGroupRequest(request: RequestChannel.Request) {
+    import JavaConversions._
+
+    val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
+
+    def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
+      val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
+      val responseHeader = new ResponseHeader(request.header.correlationId)
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    }
+
+    if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
+      sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode)
+    } else {
+      coordinator.handleSyncGroup(
+        syncGroupRequest.groupId(),
+        syncGroupRequest.generationId(),
+        syncGroupRequest.memberId(),
+        syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)),
+        sendResponseCallback
+      )
+    }
+  }
+
   def handleHeartbeatRequest(request: RequestChannel.Request) {
     val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
     val respHeader = new ResponseHeader(request.header.correlationId)
@@ -735,7 +764,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
     }
 
-    if (!authorize(request.session, Read, new Resource(ConsumerGroup, heartbeatRequest.groupId))) {
+    if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
       val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
       requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
     }
@@ -743,7 +772,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // let the coordinator to handle heartbeat
       coordinator.handleHeartbeat(
         heartbeatRequest.groupId(),
-        heartbeatRequest.consumerId(),
+        heartbeatRequest.memberId(),
         heartbeatRequest.groupGenerationId(),
         sendResponseCallback)
     }
@@ -788,11 +817,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
     }
 
-    // let the coordinator to handle leave-group
-    coordinator.handleLeaveGroup(
-      leaveGroupRequest.groupId(),
-      leaveGroupRequest.consumerId(),
-      sendResponseCallback)
+    if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
+      val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode)
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
+    } else {
+      // let the coordinator to handle leave-group
+      coordinator.handleLeaveGroup(
+        leaveGroupRequest.groupId(),
+        leaveGroupRequest.consumerId(),
+        sendResponseCallback)
+    }
   }
 
   def close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 194ee9c..b054f48 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -277,9 +277,9 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
   val ControlledShutdownEnableProp = "controlled.shutdown.enable"
-  /** ********* Consumer coordinator configuration ***********/
-  val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
-  val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
+  /** ********* Group coordinator configuration ***********/
+  val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms"
+  val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms"
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
   val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
@@ -619,8 +619,8 @@ object KafkaConfig {
       .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
 
       /** ********* Consumer coordinator configuration ***********/
-      .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
-      .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
+      .define(GroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
+      .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
 
       /** ********* Offset management configuration ***********/
       .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)
@@ -799,9 +799,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
   val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
 
-  /** ********* Consumer coordinator configuration ***********/
-  val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp)
-  val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp)
+  /** ********* Group coordinator configuration ***********/
+  val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
+  val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
 
   /** ********* Offset management configuration ***********/
   val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index beea83a..84d48cb 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -50,7 +50,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{ConsumerCoordinator}
+import kafka.coordinator.{GroupManagerConfig, GroupCoordinator}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -119,7 +119,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
 
-  var consumerCoordinator: ConsumerCoordinator = null
+  var consumerCoordinator: GroupCoordinator = null
 
   var kafkaController: KafkaController = null
 
@@ -187,7 +187,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController.startup()
 
         /* start kafka coordinator */
-        consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
+        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index bdc3bb6..967dc6f 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.tools.MessageFormatter
 import kafka.api.ProducerResponseStatus
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 import scala.Some
 import scala.collection._
@@ -144,9 +144,9 @@ class OffsetManager(val config: OffsetManagerConfig,
       // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
       // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
       tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
-        val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+        val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
         partitionOpt.map { partition =>
-          val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+          val appendPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
           val messages = tombstones.map(_._2).toSeq
 
           trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@@ -225,7 +225,7 @@ class OffsetManager(val config: OffsetManagerConfig,
       )
     }.toSeq
 
-    val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId))
+    val offsetTopicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, partitionFor(groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -336,7 +336,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    */
   def loadOffsetsFromLog(offsetsPartition: Int) {
 
-    val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+    val topicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
 
     loadingPartitions synchronized {
       if (loadingPartitions.contains(offsetsPartition)) {
@@ -408,7 +408,7 @@ class OffsetManager(val config: OffsetManagerConfig,
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId)
+    val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, partitionId)
 
     val hw = partitionOpt.map { partition =>
       partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -436,7 +436,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     }
 
     if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-                             .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)))
+                             .format(numRemoved, TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)))
   }
 
   def shutdown() {
@@ -448,7 +448,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    * If the topic does not exist, the configured partition count is returned.
    */
   private def getOffsetsTopicPartitionCount = {
-    val topic = ConsumerCoordinator.OffsetsTopicName
+    val topic = GroupCoordinator.OffsetsTopicName
     val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
     if (topicData(topic).nonEmpty)
       topicData(topic).size

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 84bebef..f99f0d8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -29,7 +29,7 @@ import org.junit.{Test, Before}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
@@ -50,7 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -154,7 +154,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     val numRecords = 10000
     sendRecords(numRecords)
 
-    consumer0.subscribe(List(topic))
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
+        // keep partitions paused in this test so that we can verify the commits based on specific seeks
+        partitions.foreach(consumer0.pause(_))
+      }
+
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
+    }
+
+    consumer0.subscribe(List(topic), rebalanceListener)
 
     val assignment = Set(tp, tp2)
     TestUtils.waitUntilTrue(() => {
@@ -166,11 +175,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     consumer0.seek(tp2, 500)
 
     // change subscription to trigger rebalance
-    consumer0.subscribe(List(topic, topic2))
+    consumer0.subscribe(List(topic, topic2), rebalanceListener)
 
     val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
+      val records = consumer0.poll(50)
       consumer0.assignment() == newAssignment.asJava
     }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
 
@@ -421,9 +430,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
       consumer0.poll(50)
     
     // get metadata for the topic
-    var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+    var parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
     while(parts == null)
-      parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+      parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts(0).leader())
     
@@ -436,6 +445,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
       consumer0.poll(50)
 
     assertEquals(2, listener.callsToAssigned)
+
+    // only expect one revocation since revoke is not invoked on initial membership
     assertEquals(2, listener.callsToRevoked)
 
     consumer0.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index db610c1..f2b0f85 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -19,13 +19,15 @@ import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownConsumerIdException}
+import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownMemberIdException}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Test, Before}
 
 import scala.collection.JavaConversions._
 
+
+
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
  */
@@ -43,7 +45,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -108,7 +110,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       } catch {
         // TODO: should be no need to catch these exceptions once KAFKA-2017 is
         // merged since coordinator fail-over will not cause a rebalance
-        case _: UnknownConsumerIdException | _: IllegalGenerationException =>
+        case _: UnknownMemberIdException | _: IllegalGenerationException =>
       }
     }
     scheduler.shutdown()
@@ -176,4 +178,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     }
     futures.map(_.get)
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 2ec59fb..5741ce2 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -18,16 +18,16 @@
 package kafka.api
 
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import kafka.utils.TestUtils
 import java.util.Properties
-import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
+
 import org.junit.{After, Before}
 import scala.collection.mutable.Buffer
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
@@ -60,14 +60,14 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
     for(i <- 0 until producerCount)
       producers += new KafkaProducer(producerConfig)
-    for(i <- 0 until consumerCount)
+    for(i <- 0 until consumerCount) {
       consumers += new KafkaConsumer(consumerConfig)
+    }
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName,
+    TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index bdf7e49..735a3b2 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -101,7 +101,6 @@ class QuotasTest extends KafkaServerTestHarness {
                       classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                       classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
 
     consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
     consumers += new KafkaConsumer(consumerProps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
deleted file mode 100644
index 1d13d88..0000000
--- a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-package kafka.api
-
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SslConsumerTest extends BaseConsumerTest {
-  override protected def securityProtocol = SecurityProtocol.SSL
-  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}