You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 02:47:25 UTC

[2/2] kafka git commit: KAFKA-2720: expire group metadata when all offsets have expired

KAFKA-2720: expire group metadata when all offsets have expired

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Liquan Pei, Onur Karaman, Guozhang Wang

Closes #1427 from hachikuji/KAFKA-2720


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c551675
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c551675
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c551675

Branch: refs/heads/trunk
Commit: 8c551675adb11947e9f27b20a9195c9c4a20b432
Parents: fb42558
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jun 15 19:46:42 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 15 19:46:42 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/coordinator/DelayedJoin.scala   |   4 +-
 .../kafka/coordinator/GroupCoordinator.scala    | 304 +++++------
 .../scala/kafka/coordinator/GroupMetadata.scala | 110 +++-
 .../coordinator/GroupMetadataManager.scala      | 498 ++++++++++---------
 .../kafka/coordinator/MemberMetadata.scala      |   1 +
 .../GroupCoordinatorResponseTest.scala          |   7 +-
 .../coordinator/GroupMetadataManagerTest.scala  | 407 +++++++++++++++
 .../kafka/coordinator/GroupMetadataTest.scala   | 155 +++++-
 .../kafka/coordinator/MemberMetadataTest.scala  |  11 +-
 9 files changed, 1081 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
index ae96e15..a62884a 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -30,8 +30,8 @@ import kafka.server.DelayedOperation
  * the rest of the group.
  */
 private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
-                                            group: GroupMetadata,
-                                            sessionTimeout: Long)
+                                       group: GroupMetadata,
+                                       sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index e9bbbd3..9c75f83 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -31,15 +31,6 @@ import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
 
 import scala.collection.{Map, Seq, immutable}
 
-case class GroupConfig(groupMinSessionTimeoutMs: Int,
-                       groupMaxSessionTimeoutMs: Int)
-
-case class JoinGroupResult(members: Map[String, Array[Byte]],
-                           memberId: String,
-                           generationId: Int,
-                           subProtocol: String,
-                           leaderId: String,
-                           errorCode: Short)
 
 /**
  * GroupCoordinator handles general group membership and offset management.
@@ -77,8 +68,10 @@ class GroupCoordinator(val brokerId: Int,
   /**
    * Startup logic executed at the same time when the server starts up.
    */
-  def startup() {
+  def startup(enableMetadataExpiration: Boolean = true) {
     info("Starting up.")
+    if (enableMetadataExpiration)
+      groupManager.enableMetadataExpiration()
     isActive.set(true)
     info("Startup complete.")
   }
@@ -119,16 +112,17 @@ class GroupCoordinator(val brokerId: Int,
       // only try to create the group if the group is not unknown AND
       // the member id is UNKNOWN, if member is specified but group does not
       // exist we should reject the request
-      var group = groupManager.getGroup(groupId)
-      if (group == null) {
-        if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
-        } else {
-          group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
+      groupManager.getGroup(groupId) match {
+        case None =>
+          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+          } else {
+            val group = groupManager.addGroup(new GroupMetadata(groupId))
+            doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          }
+
+        case Some(group) =>
           doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
-        }
-      } else {
-        doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
@@ -142,7 +136,7 @@ class GroupCoordinator(val brokerId: Int,
                           protocols: List[(String, Array[Byte])],
                           responseCallback: JoinCallback) {
     group synchronized {
-      if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
+      if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
@@ -160,7 +154,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -168,7 +162,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -192,10 +186,10 @@ class GroupCoordinator(val brokerId: Int,
               }
             }
 
-          case Stable =>
+          case Empty | Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -233,11 +227,10 @@ class GroupCoordinator(val brokerId: Int,
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
     } else {
-      val group = groupManager.getGroup(groupId)
-      if (group == null)
-        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
-      else
-        doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+      groupManager.getGroup(groupId) match {
+        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+        case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+      }
     }
   }
 
@@ -255,7 +248,7 @@ class GroupCoordinator(val brokerId: Int,
         responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
       } else {
         group.currentState match {
-          case Dead =>
+          case Empty | Dead =>
             responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
 
           case PreparingRebalance =>
@@ -301,7 +294,8 @@ class GroupCoordinator(val brokerId: Int,
     }
 
     // store the group metadata without holding the group lock to avoid the potential
-    // for deadlock when the callback is invoked
+    // for deadlock if the callback is invoked holding other locks (e.g. the replica
+    // state change lock)
     delayedGroupStore.foreach(groupManager.store)
   }
 
@@ -313,26 +307,25 @@ class GroupCoordinator(val brokerId: Int,
     } else if (isCoordinatorLoadingInProgress(groupId)) {
       responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
     } else {
-      val group = groupManager.getGroup(groupId)
-      if (group == null) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; this is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-        // joining without specified consumer id,
-        responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-          } else if (!group.has(consumerId)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-          } else {
-            val member = group.get(consumerId)
-            removeHeartbeatForLeavingMember(group, member)
-            onMemberFailure(group, member)
-            responseCallback(Errors.NONE.code)
+      groupManager.getGroup(groupId) match {
+        case None =>
+          // if the group is marked as dead, it means some other thread has just removed the group
+          // from the coordinator metadata; this is likely that the group has migrated to some other
+          // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+          // joining without specified consumer id,
+          responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+        case Some(group) =>
+          group synchronized {
+            if (group.is(Dead) || !group.has(consumerId)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+            } else {
+              val member = group.get(consumerId)
+              removeHeartbeatForLeavingMember(group, member)
+              onMemberFailure(group, member)
+              responseCallback(Errors.NONE.code)
+            }
           }
-        }
       }
     }
   }
@@ -349,29 +342,30 @@ class GroupCoordinator(val brokerId: Int,
       // the group is still loading, so respond just blindly
       responseCallback(Errors.NONE.code)
     } else {
-      val group = groupManager.getGroup(groupId)
-      if (group == null) {
-        responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            // if the group is marked as dead, it means some other thread has just removed the group
-            // from the coordinator metadata; this is likely that the group has migrated to some other
-            // coordinator OR the group is in a transient unstable phase. Let the member retry
-            // joining without the specified member id,
-            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-          } else if (!group.is(Stable)) {
-            responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
-          } else if (!group.has(memberId)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-          } else if (generationId != group.generationId) {
-            responseCallback(Errors.ILLEGAL_GENERATION.code)
-          } else {
-            val member = group.get(memberId)
-            completeAndScheduleNextHeartbeatExpiration(group, member)
-            responseCallback(Errors.NONE.code)
+      groupManager.getGroup(groupId) match {
+        case None => responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+        case Some(group) =>
+          group synchronized {
+            if (group.is(Empty)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+            } else if (group.is(Dead)) {
+              // if the group is marked as dead, it means some other thread has just removed the group
+              // from the coordinator metadata; this is likely that the group has migrated to some other
+              // coordinator OR the group is in a transient unstable phase. Let the member retry
+              // joining without the specified member id,
+              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+            } else if (!group.is(Stable)) {
+              responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+            } else if (!group.has(memberId)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+            } else if (generationId != group.generationId) {
+              responseCallback(Errors.ILLEGAL_GENERATION.code)
+            } else {
+              val member = group.get(memberId)
+              completeAndScheduleNextHeartbeatExpiration(group, member)
+              responseCallback(Errors.NONE.code)
+            }
           }
-        }
       }
     }
   }
@@ -381,8 +375,6 @@ class GroupCoordinator(val brokerId: Int,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
-    var delayedOffsetStore: Option[DelayedStore] = None
-
     if (!isActive.get) {
       responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
     } else if (!isCoordinatorForGroup(groupId)) {
@@ -390,33 +382,48 @@ class GroupCoordinator(val brokerId: Int,
     } else if (isCoordinatorLoadingInProgress(groupId)) {
       responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
     } else {
-      val group = groupManager.getGroup(groupId)
-      if (group == null) {
-        if (generationId < 0)
-          // the group is not relying on Kafka for partition management, so allow the commit
-          delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata,
-            responseCallback))
-        else
-          // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
-          // or this is a request coming from an older generation. either way, reject the commit
-          responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
-      } else {
-        group synchronized {
-          if (group.is(Dead)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
-          } else if (group.is(AwaitingSync)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
-          } else if (!group.has(memberId)) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
-          } else if (generationId != group.generationId) {
-            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+      groupManager.getGroup(groupId) match {
+        case None =>
+          if (generationId < 0) {
+            // the group is not relying on Kafka for group management, so allow the commit
+            val group = groupManager.addGroup(new GroupMetadata(groupId))
+            doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
           } else {
-            val member = group.get(memberId)
-            completeAndScheduleNextHeartbeatExpiration(group, member)
-            delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
-              offsetMetadata, responseCallback))
+            // or this is a request coming from an older generation. either way, reject the commit
+            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
           }
-        }
+
+        case Some(group) =>
+          doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+      }
+    }
+  }
+
+  def doCommitOffsets(group: GroupMetadata,
+                      memberId: String,
+                      generationId: Int,
+                      offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                      responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
+    var delayedOffsetStore: Option[DelayedStore] = None
+
+    group synchronized {
+      if (group.is(Dead)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+      } else if (generationId < 0 && group.is(Empty)) {
+        // the group is only using Kafka to store offsets
+        delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback))
+      } else if (group.is(AwaitingSync)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
+      } else if (!group.has(memberId)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+      } else if (generationId != group.generationId) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+      } else {
+        val member = group.get(memberId)
+        completeAndScheduleNextHeartbeatExpiration(group, member)
+        delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback))
       }
     }
 
@@ -424,12 +431,14 @@ class GroupCoordinator(val brokerId: Int,
     delayedOffsetStore.foreach(groupManager.store)
   }
 
+
   def handleFetchOffsets(groupId: String,
                          partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
     if (!isActive.get) {
       partitions.map { case topicPartition =>
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
     } else if (!isCoordinatorForGroup(groupId)) {
+      debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
       partitions.map { case topicPartition =>
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
     } else if (isCoordinatorLoadingInProgress(groupId)) {
@@ -459,13 +468,12 @@ class GroupCoordinator(val brokerId: Int,
     } else if (isCoordinatorLoadingInProgress(groupId)) {
       (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
     } else {
-      val group = groupManager.getGroup(groupId)
-      if (group == null) {
-        (Errors.NONE, GroupCoordinator.DeadGroup)
-      } else {
-        group synchronized {
-          (Errors.NONE, group.summary)
-        }
+      groupManager.getGroup(groupId) match {
+        case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+        case Some(group) =>
+          group synchronized {
+            (Errors.NONE, group.summary)
+          }
       }
     }
   }
@@ -477,7 +485,7 @@ class GroupCoordinator(val brokerId: Int,
       group.transitionTo(Dead)
 
       previousState match {
-        case Dead =>
+        case Empty | Dead =>
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingJoinCallback != null) {
@@ -502,7 +510,7 @@ class GroupCoordinator(val brokerId: Int,
   private def onGroupLoaded(group: GroupMetadata) {
     group synchronized {
       info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
-      assert(group.is(Stable))
+      assert(group.is(Stable) || group.is(Empty))
       group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
     }
   }
@@ -580,12 +588,13 @@ class GroupCoordinator(val brokerId: Int,
   private def addMemberAndRebalance(sessionTimeoutMs: Int,
                                     clientId: String,
                                     clientHost: String,
+                                    protocolType: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) = {
     // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
+    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
     maybePrepareRebalance(group)
@@ -626,7 +635,7 @@ class GroupCoordinator(val brokerId: Int,
     trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
     group.remove(member.memberId)
     group.currentState match {
-      case Dead =>
+      case Dead | Empty =>
       case Stable | AwaitingSync => maybePrepareRebalance(group)
       case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
     }
@@ -645,42 +654,49 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onCompleteJoin(group: GroupMetadata) {
+    var delayedStore: Option[DelayedStore] = None
     group synchronized {
-      val failedMembers = group.notYetRejoinedMembers
-      if (group.isEmpty || failedMembers.nonEmpty) {
-        failedMembers.foreach { failedMember =>
-          group.remove(failedMember.memberId)
-          // TODO: cut the socket connection to the client
-        }
-
-        // TODO KAFKA-2720: only remove group in the background thread
-        if (group.isEmpty) {
-          group.transitionTo(Dead)
-          groupManager.removeGroup(group)
-          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
-        }
+      // remove any members who haven't joined the group yet
+      group.notYetRejoinedMembers.foreach { failedMember =>
+        group.remove(failedMember.memberId)
+        // TODO: cut the socket connection to the client
       }
+
       if (!group.is(Dead)) {
         group.initNextGeneration()
-        info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
-
-        // trigger the awaiting join group response callback for all the members after rebalancing
-        for (member <- group.allMemberMetadata) {
-          assert(member.awaitingJoinCallback != null)
-          val joinResult = JoinGroupResult(
-            members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
-            memberId=member.memberId,
-            generationId=group.generationId,
-            subProtocol=group.protocol,
-            leaderId=group.leaderId,
-            errorCode=Errors.NONE.code)
-
-          member.awaitingJoinCallback(joinResult)
-          member.awaitingJoinCallback = null
-          completeAndScheduleNextHeartbeatExpiration(group, member)
+        if (group.is(Empty)) {
+          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+          delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => {
+            if (errorCode != Errors.NONE.code) {
+              // we failed to persist the empty group. if we don't retry (which is how
+              // we handle the situation when a normal rebalance fails, then a coordinator
+              // change will cause the old generation to come back to life.
+            }
+          }))
+        } else {
+          info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+
+          // trigger the awaiting join group response callback for all the members after rebalancing
+          for (member <- group.allMemberMetadata) {
+            assert(member.awaitingJoinCallback != null)
+            val joinResult = JoinGroupResult(
+              members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+              memberId=member.memberId,
+              generationId=group.generationId,
+              subProtocol=group.protocol,
+              leaderId=group.leaderId,
+              errorCode=Errors.NONE.code)
+
+            member.awaitingJoinCallback(joinResult)
+            member.awaitingJoinCallback = null
+            completeAndScheduleNextHeartbeatExpiration(group, member)
+          }
         }
       }
     }
+
+    // call without holding the group lock
+    delayedStore.foreach(groupManager.store)
   }
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
@@ -757,3 +773,13 @@ object GroupCoordinator {
   }
 
 }
+
+case class GroupConfig(groupMinSessionTimeoutMs: Int,
+                       groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+                           memberId: String,
+                           generationId: Int,
+                           subProtocol: String,
+                           leaderId: String,
+                           errorCode: Short)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 4fa656e..b455964 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -18,10 +18,10 @@
 package kafka.coordinator
 
 import kafka.utils.nonthreadsafe
-
 import java.util.UUID
 
-import org.apache.kafka.common.protocol.Errors
+import kafka.common.OffsetAndMetadata
+import org.apache.kafka.common.TopicPartition
 
 import collection.mutable
 
@@ -37,7 +37,8 @@ private[coordinator] sealed trait GroupState { def state: Byte }
  *         allow offset commits from previous generation
  *         allow offset fetch requests
  * transition: some members have joined by the timeout => AwaitingSync
- *             all members have left the group => Dead
+ *             all members have left the group => Empty
+ *             group is removed by partition emigration => Dead
  */
 private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
 
@@ -52,6 +53,7 @@ private[coordinator] case object PreparingRebalance extends GroupState { val sta
  *             join group from new member or existing member with updated metadata => PreparingRebalance
  *             leave group from existing member => PreparingRebalance
  *             member failure detected => PreparingRebalance
+ *             group is removed by partition emigration => Dead
  */
 private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
 
@@ -67,11 +69,12 @@ private[coordinator] case object AwaitingSync extends GroupState { val state: By
  *             leave group from existing member => PreparingRebalance
  *             leader join-group received => PreparingRebalance
  *             follower join-group with new metadata => PreparingRebalance
+ *             group is removed by partition emigration => Dead
  */
 private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
 
 /**
- * Group has no more members
+ * Group has no more members and its metadata is being removed
  *
  * action: respond to join group with UNKNOWN_MEMBER_ID
  *         respond to sync group with UNKNOWN_MEMBER_ID
@@ -83,13 +86,31 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3
  */
 private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
 
+/**
+  * Group has no more members, but lingers until all offsets have expired. This state
+  * also represents groups which use Kafka only for offset commits and have no members.
+  *
+  * action: respond normally to join group from new members
+  *         respond to sync group with UNKNOWN_MEMBER_ID
+  *         respond to heartbeat with UNKNOWN_MEMBER_ID
+  *         respond to leave group with UNKNOWN_MEMBER_ID
+  *         respond to offset commit with UNKNOWN_MEMBER_ID
+  *         allow offset fetch requests
+  * transition: last offsets removed in periodic expiration task => Dead
+  *             join group from a new member => PreparingRebalance
+  *             group is removed by partition emigration => Dead
+  *             group is removed by expiration => Dead
+  */
+private[coordinator] case object Empty extends GroupState { val state: Byte = 5 }
+
 
 private object GroupMetadata {
   private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync),
+    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
       AwaitingSync -> Set(PreparingRebalance),
       Stable -> Set(AwaitingSync),
-      PreparingRebalance -> Set(Stable, AwaitingSync))
+      PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
+      Empty -> Set(PreparingRebalance))
 }
 
 /**
@@ -120,10 +141,14 @@ case class GroupSummary(state: String,
  *  3. leader id
  */
 @nonthreadsafe
-private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
+private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
 
+  private var state: GroupState = initialState
   private val members = new mutable.HashMap[String, MemberMetadata]
-  private var state: GroupState = Stable
+  private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+  private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+
+  var protocolType: Option[String] = None
   var generationId = 0
   var leaderId: String = null
   var protocol: String = null
@@ -134,6 +159,11 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
   def get(memberId: String) = members(memberId)
 
   def add(memberId: String, member: MemberMetadata) {
+    if (members.isEmpty)
+      this.protocolType = Some(member.protocolType)
+
+    assert(groupId == member.groupId)
+    assert(this.protocolType.orNull == member.protocolType)
     assert(supportsProtocols(member.protocols))
 
     if (leaderId == null)
@@ -154,8 +184,6 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   def currentState = state
 
-  def isEmpty = members.isEmpty
-
   def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
 
   def allMembers = members.keySet
@@ -169,7 +197,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
   // TODO: decide if ids should be predictable or random
   def generateMemberIdSuffix = UUID.randomUUID().toString
 
-  def canRebalance = state == Stable || state == AwaitingSync
+  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
 
   def transitionTo(groupState: GroupState) {
     assertValidTransition(groupState)
@@ -201,14 +229,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
   }
 
   def supportsProtocols(memberProtocols: Set[String]) = {
-    isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+    members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
   }
 
   def initNextGeneration() = {
     assert(notYetRejoinedMembers == List.empty[MemberMetadata])
-    generationId += 1
-    protocol = selectProtocol
-    transitionTo(AwaitingSync)
+    if (members.nonEmpty) {
+      generationId += 1
+      protocol = selectProtocol
+      transitionTo(AwaitingSync)
+    } else {
+      generationId += 1
+      protocol = null
+      transitionTo(Empty)
+    }
   }
 
   def currentMemberMetadata: Map[String, Array[Byte]] = {
@@ -219,18 +253,53 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
 
   def summary: GroupSummary = {
     if (is(Stable)) {
-      val members = this.members.values.map{ member => member.summary(protocol) }.toList
-      GroupSummary(state.toString, protocolType, protocol, members)
+      val members = this.members.values.map { member => member.summary(protocol) }.toList
+      GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
     } else {
       val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
-      GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+      GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
     }
   }
 
   def overview: GroupOverview = {
-    GroupOverview(groupId, protocolType)
+    GroupOverview(groupId, protocolType.getOrElse(""))
+  }
+
+  def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
+    offsets.put(topicPartition, offset)
+    pendingOffsetCommits.get(topicPartition) match {
+      case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
+      case _ =>
+    }
+  }
+
+  def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
+    pendingOffsetCommits.get(topicPartition) match {
+      case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
+      case _ =>
+    }
+  }
+
+  def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
+    pendingOffsetCommits ++= offsets
+  }
+
+  def removeExpiredOffsets(startMs: Long) = {
+    val expiredOffsets = offsets.filter {
+      case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
+    }
+    offsets --= expiredOffsets.keySet
+    expiredOffsets
   }
 
+  def allOffsets = offsets.toMap
+
+  def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
+
+  def numOffsets = offsets.size
+
+  def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
+
   private def assertValidTransition(targetState: GroupState) {
     if (!GroupMetadata.validPreviousStates(targetState).contains(state))
       throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
@@ -240,4 +309,5 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
   override def toString = {
     "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index b968f97..915c360 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -17,12 +17,11 @@
 
 package kafka.coordinator
 
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.CoreUtils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
 import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
 import org.apache.kafka.common.protocol.types.Type.BYTES
@@ -39,16 +38,18 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.common.MessageFormatter
 import kafka.server.ReplicaManager
+
 import scala.collection._
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 import com.yammer.metrics.core.Gauge
+import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.internals.TopicConstants
 
-case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
-                        callback: Map[TopicPartition, PartitionResponse] => Unit)
 
 class GroupMetadataManager(val brokerId: Int,
                            val config: OffsetConfig,
@@ -56,72 +57,77 @@ class GroupMetadataManager(val brokerId: Int,
                            zkUtils: ZkUtils,
                            time: Time) extends Logging with KafkaMetricsGroup {
 
-  /* offsets cache */
-  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
+  private val groupMetadataCache = new Pool[String, GroupMetadata]
 
-  /* group metadata cache */
-  private val groupsCache = new Pool[String, GroupMetadata]
+  /* lock protecting access to loading and owned partition sets */
+  private val partitionLock = new ReentrantLock()
 
-  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed */
+  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
   private val loadingPartitions: mutable.Set[Int] = mutable.Set()
 
   /* partitions of consumer groups that are assigned, using the same loading partition lock */
   private val ownedPartitions: mutable.Set[Int] = mutable.Set()
 
-  /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
-  private val offsetExpireLock = new ReentrantReadWriteLock()
-
   /* shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
   /* number of partitions for the consumer metadata topic */
   private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
 
-  /* Single-thread scheduler to handling offset/group metadata cache loading and unloading */
+  /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
 
   this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
 
-  scheduler.startup()
-  scheduler.schedule(name = "delete-expired-consumer-offsets",
-    fun = deleteExpiredOffsets,
-    period = config.offsetsRetentionCheckIntervalMs,
-    unit = TimeUnit.MILLISECONDS)
-
   newGauge("NumOffsets",
     new Gauge[Int] {
-      def value = offsetsCache.size
+      def value = groupMetadataCache.values.map(group => {
+        group synchronized { group.numOffsets }
+      }).sum
     }
   )
 
   newGauge("NumGroups",
     new Gauge[Int] {
-      def value = groupsCache.size
+      def value = groupMetadataCache.size
     }
   )
 
-  def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+  def enableMetadataExpiration() {
+    scheduler.startup()
+
+    scheduler.schedule(name = "delete-expired-group-metadata",
+      fun = cleanupGroupMetadata,
+      period = config.offsetsRetentionCheckIntervalMs,
+      unit = TimeUnit.MILLISECONDS)
+  }
+
+  def currentGroups(): Iterable[GroupMetadata] = groupMetadataCache.values
+
+  def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) }
+
+  def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) }
 
   def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
 
-  def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
+  def isGroupLocal(groupId: String): Boolean = isPartitionOwned(partitionFor(groupId))
 
-  def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
+  def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
 
-  def isLoading(): Boolean = loadingPartitions synchronized loadingPartitions.nonEmpty
+  def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
 
   /**
    * Get the group associated with the given groupId, or null if not found
    */
-  def getGroup(groupId: String): GroupMetadata = {
-      groupsCache.get(groupId)
+  def getGroup(groupId: String): Option[GroupMetadata] = {
+    Option(groupMetadataCache.get(groupId))
   }
 
   /**
    * Add a group or get the group associated with the given groupId if it already exists
    */
   def addGroup(group: GroupMetadata): GroupMetadata = {
-    val currentGroup = groupsCache.putIfNotExists(group.groupId, group)
+    val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
     if (currentGroup != null) {
       currentGroup
     } else {
@@ -130,13 +136,15 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   /**
-   * Remove all metadata associated with the group
-   * @param group
+   * Remove the group from the cache and delete all metadata associated with it. This should be
+   * called only after all offsets for the group have expired and no members are remaining (i.e.
+   * it is in the Empty state).
    */
-  def removeGroup(group: GroupMetadata) {
+  private def evictGroupAndDeleteMetadata(group: GroupMetadata) {
     // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
-    // while the group is being removed due to coordinator emigration)
-    if (groupsCache.remove(group.groupId, group)) {
+    // while the group is being removed due to coordinator emigration). We also avoid writing the tombstone
+    // when the generationId is 0, since this group is only using Kafka for offset storage.
+    if (groupMetadataCache.remove(group.groupId, group) && group.generationId > 0) {
       // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
       // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
       // retry removing this group.
@@ -154,7 +162,6 @@ class GroupMetadataManager(val brokerId: Int,
         try {
           // do not need to require acks since even if the tombstone is lost,
           // it will be appended again by the new leader
-          // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
           partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
         } catch {
           case t: Throwable =>
@@ -227,20 +234,20 @@ class GroupMetadataManager(val brokerId: Int,
     DelayedStore(groupMetadataMessageSet, putCacheCallback)
   }
 
-  def store(delayedAppend: DelayedStore) {
+  def store(delayedStore: DelayedStore) {
     // call replica manager to append the group message
     replicaManager.appendMessages(
       config.offsetCommitTimeoutMs.toLong,
       config.offsetCommitRequiredAcks,
       true, // allow appending to internal offset topic
-      delayedAppend.messageSet,
-      delayedAppend.callback)
+      delayedStore.messageSet,
+      delayedStore.callback)
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
    */
-  def prepareStoreOffsets(groupId: String,
+  def prepareStoreOffsets(group: GroupMetadata,
                           consumerId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
@@ -252,16 +259,16 @@ class GroupMetadataManager(val brokerId: Int,
 
     // construct the message set to append
     val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
+      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
       new Message(
-        key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
+        key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
         bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
         timestamp = timestamp,
         magicValue = magicValue
       )
     }.toSeq
 
-    val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))
+    val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -278,29 +285,38 @@ class GroupMetadataManager(val brokerId: Int,
       val status = responseStatus(offsetTopicPartition)
 
       val responseCode =
-        if (status.errorCode == Errors.NONE.code) {
-          filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-            putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
+        group synchronized {
+          if (status.errorCode == Errors.NONE.code) {
+            if (!group.is(Dead)) {
+              filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+                group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+              }
+            }
+            Errors.NONE.code
+          } else {
+            if (!group.is(Dead)) {
+              filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+                group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+              }
+            }
+
+            debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
+              .format(filteredOffsetMetadata, group.groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
+
+            // transform the log append error code to the corresponding the commit status error code
+            if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+              Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
+            else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
+              Errors.NOT_COORDINATOR_FOR_GROUP.code
+            else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
+              || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
+              || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
+              Errors.INVALID_COMMIT_OFFSET_SIZE.code
+            else
+              status.errorCode
           }
-          Errors.NONE.code
-        } else {
-          debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
-            .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
-
-          // transform the log append error code to the corresponding the commit status error code
-          if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-            Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-          else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
-            Errors.NOT_COORDINATOR_FOR_GROUP.code
-          else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
-            || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
-            || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
-            Errors.INVALID_COMMIT_OFFSET_SIZE.code
-          else
-            status.errorCode
         }
 
-
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
         if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
@@ -313,6 +329,10 @@ class GroupMetadataManager(val brokerId: Int,
       responseCallback(commitStatus)
     }
 
+    group synchronized {
+      group.prepareOffsetCommit(offsetMetadata)
+    }
+
     DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
   }
 
@@ -320,27 +340,36 @@ class GroupMetadataManager(val brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
-    trace("Getting offsets %s for group %s.".format(topicPartitions, group))
-
-    if (isGroupLocal(group)) {
-      if (topicPartitions.isEmpty) {
-        // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
-        offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
-          (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
-        }.toMap
-      } else {
-        topicPartitions.map { topicPartition =>
-          val groupTopicPartition = GroupTopicPartition(group, topicPartition)
-          (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
-        }.toMap
-      }
-    } else {
-      debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
+  def getOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
+    trace("Getting offsets %s for group %s.".format(topicPartitions, groupId))
+    val group = groupMetadataCache.get(groupId)
+    if (group == null) {
       topicPartitions.map { topicPartition =>
-        val groupTopicPartition = GroupTopicPartition(group, topicPartition)
-        (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
       }.toMap
+    } else {
+      group synchronized {
+        if (group.is(Dead)) {
+          topicPartitions.map { topicPartition =>
+            (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+          }.toMap
+        } else {
+            if (topicPartitions.isEmpty) {
+              // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
+              group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
+                (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+              }
+            } else {
+              topicPartitions.map { topicPartition =>
+                group.offset(topicPartition) match {
+                  case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+                  case Some(offsetAndMetadata) =>
+                    (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+                }
+              }.toMap
+            }
+        }
+      }
     }
   }
 
@@ -355,7 +384,7 @@ class GroupMetadataManager(val brokerId: Int,
     def loadGroupsAndOffsets() {
       info("Loading offsets and group metadata from " + topicPartition)
 
-      loadingPartitions synchronized {
+      inLock(partitionLock) {
         if (loadingPartitions.contains(offsetsPartition)) {
           info("Offset load from %s already in progress.".format(topicPartition))
           return
@@ -371,74 +400,70 @@ class GroupMetadataManager(val brokerId: Int,
             var currOffset = log.logSegments.head.baseOffset
             val buffer = ByteBuffer.allocate(config.loadBufferSize)
             // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
-            inWriteLock(offsetExpireLock) {
-              val loadedGroups = mutable.Map[String, GroupMetadata]()
-              val removedGroups = mutable.Set[String]()
-
-              while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
-                buffer.clear()
-                val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
-                messages.readInto(buffer, 0)
-                val messageSet = new ByteBufferMessageSet(buffer)
-                messageSet.foreach { msgAndOffset =>
-                  require(msgAndOffset.message.key != null, "Offset entry key should not be null")
-                  val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
-
-                  if (baseKey.isInstanceOf[OffsetKey]) {
-                    // load offset
-                    val key = baseKey.key.asInstanceOf[GroupTopicPartition]
-                    if (msgAndOffset.message.payload == null) {
-                      if (offsetsCache.remove(key) != null)
-                        trace("Removed offset for %s due to tombstone entry.".format(key))
-                      else
-                        trace("Ignoring redundant tombstone for %s.".format(key))
-                    } else {
-                      // special handling for version 0:
-                      // set the expiration time stamp as commit time stamp + server default retention time
-                      val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
-                      putOffset(key, value.copy (
-                        expireTimestamp = {
-                          if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
-                            value.commitTimestamp + config.offsetsRetentionMs
-                          else
-                            value.expireTimestamp
-                        }
-                      ))
-                      trace("Loaded offset %s for %s.".format(value, key))
-                    }
+            val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
+            val removedOffsets = mutable.Set[GroupTopicPartition]()
+            val loadedGroups = mutable.Map[String, GroupMetadata]()
+            val removedGroups = mutable.Set[String]()
+
+            while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
+              buffer.clear()
+              val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
+              messages.readInto(buffer, 0)
+              val messageSet = new ByteBufferMessageSet(buffer)
+              messageSet.foreach { msgAndOffset =>
+                require(msgAndOffset.message.key != null, "Offset entry key should not be null")
+                val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
+
+                if (baseKey.isInstanceOf[OffsetKey]) {
+                  // load offset
+                  val key = baseKey.key.asInstanceOf[GroupTopicPartition]
+                  if (msgAndOffset.message.payload == null) {
+                    loadedOffsets.remove(key)
+                    removedOffsets.add(key)
                   } else {
-                    // load group metadata
-                    val groupId = baseKey.key.asInstanceOf[String]
-                    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
-                    if (groupMetadata != null) {
-                      trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
-                      removedGroups.remove(groupId)
-                      loadedGroups.put(groupId, groupMetadata)
-                    } else {
-                      loadedGroups.remove(groupId)
-                      removedGroups.add(groupId)
-                    }
+                    val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
+                    loadedOffsets.put(key, value)
+                    removedOffsets.remove(key)
+                  }
+                } else {
+                  // load group metadata
+                  val groupId = baseKey.key.asInstanceOf[String]
+                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
+                  if (groupMetadata != null) {
+                    trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
+                    removedGroups.remove(groupId)
+                    loadedGroups.put(groupId, groupMetadata)
+                  } else {
+                    loadedGroups.remove(groupId)
+                    removedGroups.add(groupId)
                   }
-
-                  currOffset = msgAndOffset.nextOffset
                 }
-              }
 
-              loadedGroups.values.foreach { group =>
-                val currentGroup = addGroup(group)
-                if (group != currentGroup)
-                  debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
-                    s"because there is already a cached group with generation ${currentGroup.generationId}")
-                else
-                  onGroupLoaded(group)
+                currOffset = msgAndOffset.nextOffset
               }
+            }
 
-              removedGroups.foreach { groupId =>
-                val group = groupsCache.get(groupId)
-                if (group != null)
-                  throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
-                    s"loading partition ${topicPartition}")
-              }
+            val (groupOffsets, noGroupOffsets)  = loadedOffsets
+              .groupBy(_._1.group)
+              .mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)})
+              .partition(value => loadedGroups.contains(value._1))
+
+            loadedGroups.values.foreach { group =>
+              val offsets = groupOffsets.getOrElse(group.groupId, Map.empty)
+              loadGroup(group, offsets)
+              onGroupLoaded(group)
+            }
+
+            noGroupOffsets.foreach { case (groupId, offsets) =>
+              val group = new GroupMetadata(groupId)
+              loadGroup(group, offsets)
+              onGroupLoaded(group)
+            }
+
+            removedGroups.foreach { groupId =>
+              if (groupMetadataCache.contains(groupId))
+                throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " +
+                  s"loading partition ${topicPartition}")
             }
 
             if (!shuttingDown.get())
@@ -453,7 +478,7 @@ class GroupMetadataManager(val brokerId: Int,
           error("Error in loading offsets from " + topicPartition, t)
       }
       finally {
-        loadingPartitions synchronized {
+        inLock(partitionLock) {
           ownedPartitions.add(offsetsPartition)
           loadingPartitions.remove(offsetsPartition)
         }
@@ -461,10 +486,37 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
+  private def loadGroup(group: GroupMetadata, offsets: Iterable[(TopicPartition, OffsetAndMetadata)]): Unit = {
+    val currentGroup = addGroup(group)
+    if (group != currentGroup) {
+      debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+        s"because there is already a cached group with generation ${currentGroup.generationId}")
+    } else {
+
+      offsets.foreach {
+        case (topicPartition, offsetAndMetadata) => {
+          val offset = offsetAndMetadata.copy (
+            expireTimestamp = {
+              // special handling for version 0:
+              // set the expiration time stamp as commit time stamp + server default retention time
+              if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs
+              else
+                offsetAndMetadata.expireTimestamp
+            }
+          )
+          trace("Loaded offset %s for %s.".format(offset, topicPartition))
+          group.completePendingOffsetWrite(topicPartition, offset)
+        }
+      }
+    }
+  }
+
   /**
    * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
    * that partition.
-   * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+    *
+    * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
@@ -475,31 +527,22 @@ class GroupMetadataManager(val brokerId: Int,
       var numOffsetsRemoved = 0
       var numGroupsRemoved = 0
 
-      loadingPartitions synchronized {
+      inLock(partitionLock) {
         // we need to guard the group removal in cache in the loading partition lock
         // to prevent coordinator's check-and-get-group race condition
         ownedPartitions.remove(offsetsPartition)
 
-        // clear the offsets for this partition in the cache
-
         /**
          * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
          * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
          * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
          */
-        offsetsCache.keys.foreach { key =>
-          if (partitionFor(key.group) == offsetsPartition) {
-            offsetsCache.remove(key)
-            numOffsetsRemoved += 1
-          }
-        }
-
-        // clear the groups for this partition in the cache
-        for (group <- groupsCache.values) {
+        for (group <- groupMetadataCache.values) {
           if (partitionFor(group.groupId) == offsetsPartition) {
             onGroupUnloaded(group)
-            groupsCache.remove(group.groupId, group)
+            groupMetadataCache.remove(group.groupId, group)
             numGroupsRemoved += 1
+            numOffsetsRemoved += group.numOffsets
           }
         }
       }
@@ -512,82 +555,53 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
-  /**
-   * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
-   *
-   * @param key The requested group-topic-partition
-   * @return If the key is present, return the offset and metadata; otherwise return None
-   */
-  private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = {
-    val offsetAndMetadata = offsetsCache.get(key)
-    if (offsetAndMetadata == null)
-      new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)
-    else
-      new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
-  }
-
-  /**
-   * Put the (already committed) offset for the given group/topic/partition into the cache.
-   *
-   * @param key The group-topic-partition
-   * @param offsetAndMetadata The offset/metadata to be stored
-   */
-  private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
-    offsetsCache.put(key, offsetAndMetadata)
-  }
-
-  private def deleteExpiredOffsets() {
-    debug("Collecting expired offsets.")
+  // visible for testing
+  private[coordinator] def cleanupGroupMetadata() {
     val startMs = time.milliseconds()
-
-    val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
-      val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
-        offsetAndMetadata.expireTimestamp < startMs
-      }
-
-      debug("Found %d expired offsets.".format(expiredOffsets.size))
-
-      // delete the expired offsets from the table and generate tombstone messages to remove them from the log
-      val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) =>
-        val offsetsPartition = partitionFor(groupTopicAndPartition.group)
-        trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
-
-        offsetsCache.remove(groupTopicAndPartition)
-
-        val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
-          groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
-
-        val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
-        (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
-      }.groupBy { case (partition, tombstone) => partition }
-
-      // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
-      // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
-      tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
-        val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-        partitionOpt.map { partition =>
-          val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-          val messages = tombstones.map(_._2).toSeq
-
-          trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
-
-          try {
-            // do not need to require acks since even if the tombstone is lost,
-            // it will be appended again in the next purge cycle
-            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*))
-            tombstones.size
-          }
-          catch {
-            case t: Throwable =>
-              error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t)
+    var offsetsRemoved = 0
+
+    groupMetadataCache.foreach { case (groupId, group) =>
+      group synchronized {
+        if (!group.is(Dead)) {
+          val offsetsPartition = partitionFor(groupId)
+
+          // delete the expired offsets from the table and generate tombstone messages to remove them from the log
+          val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
+            trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
+            val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
+            val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
+            new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+          }.toBuffer
+
+          val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+          partitionOpt.foreach { partition =>
+            val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+            trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
+
+            try {
+              // do not need to require acks since even if the tombstone is lost,
+              // it will be appended again in the next purge cycle
+              partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+              offsetsRemoved += tombstones.size
+            }
+            catch {
+              case t: Throwable =>
+                error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
               // ignore and continue
-              0
+            }
+          }
+
+          if (group.is(Empty) && !group.hasOffsets) {
+            group.transitionTo(Dead)
+            evictGroupAndDeleteMetadata(group)
+            info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
           }
         }
-      }.sum
+      }
     }
 
-    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, time.milliseconds() - startMs))
+    info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds() - startMs))
+
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
@@ -607,9 +621,11 @@ class GroupMetadataManager(val brokerId: Int,
     metadata == null || metadata.length() <= config.maxMetadataSize
   }
 
+
   def shutdown() {
     shuttingDown.set(true)
-    scheduler.shutdown()
+    if (scheduler.isStarted)
+      scheduler.shutdown()
 
     // TODO: clear the caches
   }
@@ -642,7 +658,7 @@ class GroupMetadataManager(val brokerId: Int,
    * NOTE: this is for test only
    */
   def addPartitionOwnership(partition: Int) {
-    loadingPartitions synchronized {
+    inLock(partitionLock) {
       ownedPartitions.add(partition)
     }
   }
@@ -710,8 +726,8 @@ object GroupMetadataManager {
 
   private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING),
     new Field("generation", INT32),
-    new Field("protocol", STRING),
-    new Field("leader", STRING),
+    new Field("protocol", NULLABLE_STRING),
+    new Field("leader", NULLABLE_STRING),
     new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
   private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
   private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
@@ -787,7 +803,7 @@ object GroupMetadataManager {
    *
    * @return key bytes for group metadata message
    */
-  private def groupMetadataKey(group: String): Array[Byte] = {
+  def groupMetadataKey(group: String): Array[Byte] = {
     val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
     key.set(GROUP_KEY_GROUP_FIELD, group)
 
@@ -823,10 +839,10 @@ object GroupMetadataManager {
    * @param groupMetadata
    * @return payload for offset commit message
    */
-  private def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
+  def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
     // generate commit value with schema version 1
     val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA)
-    value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType)
+    value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType.getOrElse(""))
     value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId)
     value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
     value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
@@ -937,13 +953,16 @@ object GroupMetadataManager {
       if (version == 0) {
         val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]
 
-        val group = new GroupMetadata(groupId, protocolType)
+        val memberMetadataArray = value.getArray(GROUP_METADATA_MEMBERS_V0)
+        val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+
+        val group = new GroupMetadata(groupId, initialState)
 
         group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int]
         group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String]
         group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String]
 
-        value.getArray(GROUP_METADATA_MEMBERS_V0).foreach {
+        memberMetadataArray.foreach {
           case memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
             val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
@@ -953,7 +972,7 @@ object GroupMetadataManager {
             val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
 
             val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
-              List((group.protocol, subscription)))
+              protocolType, List((group.protocol, subscription)))
 
             member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
 
@@ -1012,6 +1031,9 @@ object GroupMetadataManager {
 
 }
 
+case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
+                        callback: Map[TopicPartition, PartitionResponse] => Unit)
+
 case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
 
   def this(group: String, topic: String, partition: Int) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index c57b990..19c9e8e 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -56,6 +56,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
                                           val clientId: String,
                                           val clientHost: String,
                                           val sessionTimeoutMs: Int,
+                                          val protocolType: String,
                                           var supportedProtocols: List[(String, Array[Byte])]) {
 
   var assignment: Array[Byte] = Array.empty[Byte]

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index dc343fa..fa13a92 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.message.{Message, MessageSet}
 import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
 import kafka.utils._
-import org.apache.kafka.common.{utils, TopicPartition}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -96,7 +96,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
 
     groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
-    groupCoordinator.startup()
+    groupCoordinator.startup(false)
 
     // add the partition into the owned partition list
     groupPartitionId = groupCoordinator.partitionFor(groupId)
@@ -106,7 +106,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   @After
   def tearDown() {
     EasyMock.reset(replicaManager)
-    groupCoordinator.shutdown()
+    if (groupCoordinator != null)
+      groupCoordinator.shutdown()
   }
 
   @Test