You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/18 03:34:56 UTC

kafka git commit: KAFKA-2841; safe group metadata cache loading/unloading

Repository: kafka
Updated Branches:
  refs/heads/trunk 06d2c7816 -> dbdec927b


KAFKA-2841; safe group metadata cache loading/unloading

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #530 from hachikuji/KAFKA-2841


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbdec927
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbdec927
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbdec927

Branch: refs/heads/trunk
Commit: dbdec927b9badcdbe4a4b4b6ccebf5044ca32747
Parents: 06d2c78
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Nov 17 18:34:51 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Nov 17 18:34:51 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    |  72 ++++--
 .../coordinator/GroupMetadataManager.scala      | 221 ++++++++++---------
 .../kafka/server/DelayedOperationKey.scala      |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  27 +--
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  18 +-
 core/src/main/scala/kafka/utils/Pool.scala      |   4 +-
 .../GroupCoordinatorResponseTest.scala          |   2 +-
 8 files changed, 200 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 23309c1..2f1b842 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -63,9 +63,8 @@ class GroupCoordinator(val brokerId: Int,
            groupConfig: GroupConfig,
            offsetConfig: OffsetConfig,
            replicaManager: ReplicaManager,
-           zkUtils: ZkUtils,
-           scheduler: Scheduler) = this(brokerId, groupConfig, offsetConfig,
-    new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils, scheduler))
+           zkUtils: ZkUtils) = this(brokerId, groupConfig, offsetConfig,
+    new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils))
 
   def offsetsTopicConfigs: Properties = {
     val props = new Properties
@@ -132,7 +131,7 @@ class GroupCoordinator(val brokerId: Int,
         if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
           responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
         } else {
-          group = groupManager.addGroup(groupId, protocolType)
+          group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
           doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
         }
       } else {
@@ -226,7 +225,7 @@ class GroupCoordinator(val brokerId: Int,
         }
 
         if (group.is(PreparingRebalance))
-          joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
       }
     }
   }
@@ -473,12 +472,49 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleGroupImmigration(offsetTopicPartitionId: Int) = {
-    groupManager.loadGroupsForPartition(offsetTopicPartitionId)
+  private def onGroupUnloaded(group: GroupMetadata) {
+    group synchronized {
+      info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
+      val previousState = group.currentState
+      group.transitionTo(Dead)
+
+      previousState match {
+        case Dead =>
+        case PreparingRebalance =>
+          for (member <- group.allMemberMetadata) {
+            if (member.awaitingJoinCallback != null) {
+              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
+              member.awaitingJoinCallback = null
+            }
+          }
+          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+
+        case Stable | AwaitingSync =>
+          for (member <- group.allMemberMetadata) {
+            if (member.awaitingSyncCallback != null) {
+              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
+              member.awaitingSyncCallback = null
+            }
+            heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
+          }
+      }
+    }
   }
 
-  def handleGroupEmigration(offsetTopicPartitionId: Int) = {
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId)
+  private def onGroupLoaded(group: GroupMetadata) {
+    group synchronized {
+      info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
+      assert(group.is(Stable))
+      group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
+    }
+  }
+
+  def handleGroupImmigration(offsetTopicPartitionId: Int) {
+    groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
+  }
+
+  def handleGroupEmigration(offsetTopicPartitionId: Int) {
+    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
   }
 
   private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
@@ -528,7 +564,7 @@ class GroupCoordinator(val brokerId: Int,
   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
     // complete current heartbeat expectation
     member.latestHeartbeat = SystemTime.milliseconds
-    val memberKey = ConsumerKey(member.groupId, member.memberId)
+    val memberKey = MemberKey(member.groupId, member.memberId)
     heartbeatPurgatory.checkAndComplete(memberKey)
 
     // reschedule the next heartbeat expiration deadline
@@ -539,8 +575,8 @@ class GroupCoordinator(val brokerId: Int,
 
   private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
     member.isLeaving = true
-    val consumerKey = ConsumerKey(member.groupId, member.memberId)
-    heartbeatPurgatory.checkAndComplete(consumerKey)
+    val memberKey = MemberKey(member.groupId, member.memberId)
+    heartbeatPurgatory.checkAndComplete(memberKey)
   }
 
   private def addMemberAndRebalance(sessionTimeoutMs: Int,
@@ -584,8 +620,8 @@ class GroupCoordinator(val brokerId: Int,
 
     val rebalanceTimeout = group.rebalanceTimeout
     val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
-    val consumerGroupKey = ConsumerGroupKey(group.groupId)
-    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
+    val groupKey = GroupKey(group.groupId)
+    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
   private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
@@ -594,7 +630,7 @@ class GroupCoordinator(val brokerId: Int,
     group.currentState match {
       case Dead =>
       case Stable | AwaitingSync => maybePrepareRebalance(group)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
     }
   }
 
@@ -621,6 +657,7 @@ class GroupCoordinator(val brokerId: Int,
 
         // TODO KAFKA-2720: only remove group in the background thread
         if (group.isEmpty) {
+          group.transitionTo(Dead)
           groupManager.removeGroup(group)
           info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
         }
@@ -694,8 +731,7 @@ object GroupCoordinator {
 
   def create(config: KafkaConfig,
              zkUtils: ZkUtils,
-             replicaManager: ReplicaManager,
-             scheduler: Scheduler): GroupCoordinator = {
+             replicaManager: ReplicaManager): GroupCoordinator = {
     val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
@@ -707,7 +743,7 @@ object GroupCoordinator {
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, scheduler)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils)
   }
 
   def create(config: KafkaConfig,

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 027abf7..a63f226 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -42,7 +42,7 @@ import scala.collection._
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 
@@ -53,8 +53,7 @@ case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
 class GroupMetadataManager(val brokerId: Int,
                            val config: OffsetConfig,
                            replicaManager: ReplicaManager,
-                           zkUtils: ZkUtils,
-                           scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+                           zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
 
   /* offsets cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -77,8 +76,12 @@ class GroupMetadataManager(val brokerId: Int,
   /* number of partitions for the consumer metadata topic */
   private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
 
+  /* Single-thread scheduler to handling offset/group metadata cache loading and unloading */
+  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
+
   this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
 
+  scheduler.startup()
   scheduler.schedule(name = "delete-expired-consumer-offsets",
     fun = deleteExpiredOffsets,
     period = config.offsetsRetentionCheckIntervalMs,
@@ -116,55 +119,45 @@ class GroupMetadataManager(val brokerId: Int,
   /**
    * Add a group or get the group associated with the given groupId if it already exists
    */
-  def addGroup(groupId: String, protocolType: String): GroupMetadata = {
-    val newGroup = new GroupMetadata(groupId, protocolType)
-    val currentGroup = groupsCache.putIfNotExists(groupId, newGroup)
-    if (currentGroup != null)
+  def addGroup(group: GroupMetadata): GroupMetadata = {
+    val currentGroup = groupsCache.putIfNotExists(group.groupId, group)
+    if (currentGroup != null) {
       currentGroup
-    else
-      newGroup
-  }
-
-  /**
-   * Update the current cached metadata for the group with the given groupId or add the group if there is none.
-   */
-  private def updateGroup(groupId: String, group: GroupMetadata) {
-    groupsCache.put(groupId, group)
+    } else {
+      group
+    }
   }
 
   /**
-   * Remove all metadata associated with the group, note this function needs to be
-   * called inside the group lock
+   * Remove all metadata associated with the group
    * @param group
    */
   def removeGroup(group: GroupMetadata) {
-    // first mark the group as dead
-    group.transitionTo(Dead)
-
-    if (groupsCache.remove(group.groupId) != group)
-      throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
-
-    // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
-    // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
-    // retry removing this group.
-    val groupPartition = partitionFor(group.groupId)
-    val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
-
-    val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
-    partitionOpt.foreach { partition =>
-      val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
-
-      trace("Marking group %s as deleted.".format(group.groupId))
-
-      try {
-        // do not need to require acks since even if the tombstone is lost,
-        // it will be appended again by the new leader
-        // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
-        partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
-      } catch {
-        case t: Throwable =>
-          error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
+    // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
+    // while the group is being removed due to coordinator emigration)
+    if (groupsCache.remove(group.groupId, group)) {
+      // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+      // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+      // retry removing this group.
+      val groupPartition = partitionFor(group.groupId)
+      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
+
+      val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+      partitionOpt.foreach { partition =>
+        val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+
+        trace("Marking group %s as deleted.".format(group.groupId))
+
+        try {
+          // do not need to require acks since even if the tombstone is lost,
+          // it will be appended again by the new leader
+          // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
+          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
+        } catch {
+          case t: Throwable =>
+            error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
           // ignore and continue
+        }
       }
     }
   }
@@ -346,23 +339,22 @@ class GroupMetadataManager(val brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
-  def loadGroupsForPartition(offsetsPartition: Int) {
-
+  def loadGroupsForPartition(offsetsPartition: Int,
+                             onGroupLoaded: GroupMetadata => Unit) {
     val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+    scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
 
-    loadingPartitions synchronized {
-      ownedPartitions.add(offsetsPartition)
+    def loadGroupsAndOffsets() {
+      info("Loading offsets and group metadata from " + topicPartition)
 
-      if (loadingPartitions.contains(offsetsPartition)) {
-        info("Offset load from %s already in progress.".format(topicPartition))
-      } else {
-        loadingPartitions.add(offsetsPartition)
-        scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
+      loadingPartitions synchronized {
+        if (loadingPartitions.contains(offsetsPartition)) {
+          info("Offset load from %s already in progress.".format(topicPartition))
+          return
+        } else {
+          loadingPartitions.add(offsetsPartition)
+        }
       }
-    }
-
-    def loadGroupsAndOffsets() {
-      info("Loading offsets from " + topicPartition)
 
       val startMs = SystemTime.milliseconds
       try {
@@ -372,6 +364,9 @@ class GroupMetadataManager(val brokerId: Int,
             val buffer = ByteBuffer.allocate(config.loadBufferSize)
             // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
             inWriteLock(offsetExpireLock) {
+              val loadedGroups = mutable.Map[String, GroupMetadata]()
+              val removedGroups = mutable.Set[String]()
+
               while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
                 buffer.clear()
                 val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
@@ -409,21 +404,33 @@ class GroupMetadataManager(val brokerId: Int,
                     val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
                     if (groupMetadata != null) {
                       trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
-                      updateGroup(groupId, groupMetadata)
+                      removedGroups.remove(groupId)
+                      loadedGroups.put(groupId, groupMetadata)
                     } else {
-                      // this is a tombstone mark, we need to delete the group from cache if it exists
-                      val group = groupsCache.remove(groupId)
-                      if (group != null) {
-                        group synchronized {
-                          group.transitionTo(Dead)
-                        }
-                      }
+                      loadedGroups.remove(groupId)
+                      removedGroups.add(groupId)
                     }
                   }
 
                   currOffset = msgAndOffset.nextOffset
                 }
               }
+
+              loadedGroups.values.foreach { group =>
+                val currentGroup = addGroup(group)
+                if (group != currentGroup)
+                  debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+                    s"because there is already a cached group with generation ${currentGroup.generationId}")
+                else
+                  onGroupLoaded(group)
+              }
+
+              removedGroups.foreach { groupId =>
+                val group = groupsCache.get(groupId)
+                if (group != null)
+                  throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
+                    s"loading partition ${topicPartition}")
+              }
             }
 
             if (!shuttingDown.get())
@@ -438,7 +445,10 @@ class GroupMetadataManager(val brokerId: Int,
           error("Error in loading offsets from " + topicPartition, t)
       }
       finally {
-        loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
+        loadingPartitions synchronized {
+          ownedPartitions.add(offsetsPartition)
+          loadingPartitions.remove(offsetsPartition)
+        }
       }
     }
   }
@@ -448,48 +458,48 @@ class GroupMetadataManager(val brokerId: Int,
    * that partition.
    * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
    */
-  def removeGroupsForPartition(offsetsPartition: Int) {
-    var numOffsetsRemoved = 0
-    var numGroupsRemoved = 0
-
-    loadingPartitions synchronized {
-      // we need to guard the group removal in cache in the loading partition lock
-      // to prevent coordinator's check-and-get-group race condition
-      ownedPartitions.remove(offsetsPartition)
-
-      // clear the offsets for this partition in the cache
-
-      /**
-       * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
-       * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
-       * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
-       */
-      offsetsCache.keys.foreach { key =>
-        if (partitionFor(key.group) == offsetsPartition) {
-          offsetsCache.remove(key)
-          numOffsetsRemoved += 1
+  def removeGroupsForPartition(offsetsPartition: Int,
+                               onGroupUnloaded: GroupMetadata => Unit) {
+    val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+
+    def removeGroupsAndOffsets() {
+      var numOffsetsRemoved = 0
+      var numGroupsRemoved = 0
+
+      loadingPartitions synchronized {
+        // we need to guard the group removal in cache in the loading partition lock
+        // to prevent coordinator's check-and-get-group race condition
+        ownedPartitions.remove(offsetsPartition)
+
+        // clear the offsets for this partition in the cache
+
+        /**
+         * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
+         * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
+         * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
+         */
+        offsetsCache.keys.foreach { key =>
+          if (partitionFor(key.group) == offsetsPartition) {
+            offsetsCache.remove(key)
+            numOffsetsRemoved += 1
+          }
         }
-      }
-
-      // clear the groups for this partition in the cache
-      for (group <- groupsCache.values) {
-        group synchronized {
-          // mark the group as dead and then remove it from cache
-          group.transitionTo(Dead)
-
-          if (groupsCache.remove(group.groupId) != group)
-            throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
 
+        // clear the groups for this partition in the cache
+        for (group <- groupsCache.values) {
+          onGroupUnloaded(group)
+          groupsCache.remove(group.groupId, group)
           numGroupsRemoved += 1
         }
       }
-    }
 
-    if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-      .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+      if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
+        .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
 
-    if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
-      .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+      if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
+        .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+    }
   }
 
   /**
@@ -588,6 +598,7 @@ class GroupMetadataManager(val brokerId: Int,
 
   def shutdown() {
     shuttingDown.set(true)
+    scheduler.shutdown()
 
     // TODO: clear the caches
   }
@@ -850,7 +861,7 @@ object GroupMetadataManager {
       // version 2 refers to offset
       val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
 
-      GroupKey(version, group)
+      GroupMetadataKey(version, group)
     } else {
       throw new IllegalStateException("Unknown version " + version + " for group metadata message")
     }
@@ -960,8 +971,8 @@ object GroupMetadataManager {
       val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
 
       // only print if the message is a group metadata record
-      if (formattedKey.isInstanceOf[GroupKey]) {
-        val groupId = formattedKey.asInstanceOf[GroupKey].key
+      if (formattedKey.isInstanceOf[GroupMetadataKey]) {
+        val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key
         val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
         output.write(groupId.getBytes)
         output.write("::".getBytes)
@@ -991,7 +1002,7 @@ case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
   override def toString = key.toString
 }
 
-case class GroupKey(version: Short, key: String) extends BaseKey {
+case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
 
   override def toString = key
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index c122bde..f005019 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -39,13 +39,13 @@ case class TopicPartitionOperationKey(topic: String, partition: Int) extends Del
 }
 
 /* used by delayed-join-group operations */
-case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey {
+case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
 
   override def keyLabel = "%s-%s".format(groupId, consumerId)
 }
 
 /* used by delayed-rebalance operations */
-case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey {
+case class GroupKey(groupId: String) extends DelayedOperationKey {
 
   override def keyLabel = groupId
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d1c6f79..bb50e40 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,6 +22,7 @@ import java.util
 
 import kafka.admin.AdminUtils
 import kafka.api._
+import kafka.cluster.Partition
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
@@ -112,23 +113,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     authorizeClusterAction(request)
 
     try {
-      // call replica manager to handle updating partitions to become leader or follower
-      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
-      // for each new leader or follower, call coordinator to handle
-      // consumer group migration
-      result.updatedLeaders.foreach { case partition =>
-        if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
-          coordinator.handleGroupImmigration(partition.partitionId)
-      }
-      result.updatedFollowers.foreach { case partition =>
-        partition.leaderReplicaIdOpt.foreach { leaderReplica =>
-          if (partition.topic == GroupCoordinator.GroupMetadataTopicName &&
-              leaderReplica == brokerId)
+      def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
+        // for each new leader or follower, call coordinator to handle consumer group migration.
+        // this callback is invoked under the replica state change lock to ensure proper order of
+        // leadership changes
+        updatedLeaders.foreach { partition =>
+          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+            coordinator.handleGroupImmigration(partition.partitionId)
+        }
+        updatedFollowers.foreach { partition =>
+          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
             coordinator.handleGroupEmigration(partition.partitionId)
         }
       }
 
+      // call replica manager to handle updating partitions to become leader or follower
+      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache, onLeadershipChange)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 80cc6f1..e8ea204 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -187,7 +187,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController.startup()
 
         /* start kafka coordinator */
-        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
+        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0dde914..a4553b3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -79,14 +79,10 @@ object LogReadResult {
                                            false)
 }
 
-case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short],
-                                        updatedLeaders: Set[Partition],
-                                        updatedFollowers: Set[Partition],
-                                        errorCode: Short) {
+case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short], errorCode: Short) {
 
   override def toString = {
-    "updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]"
-      .format(updatedLeaders, updatedFollowers, responseMap, errorCode)
+    "update results: [%s], global error: [%d]".format(responseMap, errorCode)
   }
 }
 
@@ -583,7 +579,9 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache): BecomeLeaderOrFollowerResult = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
+                             metadataCache: MetadataCache,
+                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
@@ -597,7 +595,7 @@ class ReplicaManager(val config: KafkaConfig,
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
         }
-        BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode)
+        BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
         val correlationId = leaderAndISRRequest.correlationId
@@ -651,7 +649,9 @@ class ReplicaManager(val config: KafkaConfig,
           hwThreadInitialized = true
         }
         replicaFetcherManager.shutdownIdleFetcherThreads()
-        BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError)
+
+        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+        BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.NoError)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9ddcde7..beeab0f 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -71,7 +71,9 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
   def get(key: K): V = pool.get(key)
   
   def remove(key: K): V = pool.remove(key)
-  
+
+  def remove(key: K, value: V): Boolean = pool.remove(key, value)
+
   def keys: mutable.Set[K] = {
     import JavaConversions._
     pool.keySet()

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5be410c..0f702a0 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -87,7 +87,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager)
     groupCoordinator.startup()
 
     // add the partition into the owned partition list