You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/23 04:33:16 UTC

kafka git commit: KAFKA-4362; Consumer can fail after reassignment of the offsets topic partition

Repository: kafka
Updated Branches:
  refs/heads/trunk dcea5f838 -> 4b003d8bc


KAFKA-4362; Consumer can fail after reassignment of the offsets topic partition

Author: MayureshGharat <gh...@gmail.com>

Reviewers: Jiangjie Qin <be...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2116 from MayureshGharat/KAFKA-4362


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b003d8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b003d8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b003d8b

Branch: refs/heads/trunk
Commit: 4b003d8bcfffded55a00b8ecc9eed8eb373fb6c7
Parents: dcea5f8
Author: MayureshGharat <gh...@gmail.com>
Authored: Tue Nov 22 20:32:42 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 22 20:32:42 2016 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    |  16 +-
 .../coordinator/GroupMetadataManager.scala      | 390 ++++++++++---------
 .../src/main/scala/kafka/server/KafkaApis.scala |  11 +
 .../coordinator/GroupMetadataManagerTest.scala  |  74 +++-
 4 files changed, 298 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 48efe39..eb479d5 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -267,7 +267,7 @@ class GroupCoordinator(val brokerId: Int,
               val missing = group.allMembers -- groupAssignment.keySet
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
-              delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
+              delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this callback,
                   // so we must ensure we are still in the AwaitingSync state and the same generation
@@ -282,7 +282,7 @@ class GroupCoordinator(val brokerId: Int,
                     }
                   }
                 }
-              }))
+              })
             }
 
           case Stable =>
@@ -434,8 +434,8 @@ class GroupCoordinator(val brokerId: Int,
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
       } else if (generationId < 0 && group.is(Empty)) {
         // the group is only using Kafka to store offsets
-        delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback))
+        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback)
       } else if (group.is(AwaitingSync)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
       } else if (!group.has(memberId)) {
@@ -445,8 +445,8 @@ class GroupCoordinator(val brokerId: Int,
       } else {
         val member = group.get(memberId)
         completeAndScheduleNextHeartbeatExpiration(group, member)
-        delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback))
+        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback)
       }
     }
 
@@ -692,14 +692,14 @@ class GroupCoordinator(val brokerId: Int,
         if (group.is(Empty)) {
           info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
 
-          delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, error => {
+          delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
             if (error != Errors.NONE) {
               // we failed to write the empty group metadata. If the broker fails before another rebalance,
               // the previous generation written to the log will become active again (and most likely timeout).
               // This should be safe since there are no active members in an empty generation, so we just warn.
               warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
             }
-          }))
+          })
         } else {
           info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e0c8e65..d1e6945 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -150,24 +150,25 @@ class GroupMetadataManager(val brokerId: Int,
       // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
       // retry removing this group.
       val groupPartition = partitionFor(group.groupId)
-      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
-      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
-        timestamp = timestamp, magicValue = magicValue)
-
-      val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
-      partitionOpt.foreach { partition =>
-        val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
-
-        trace("Marking group %s as deleted.".format(group.groupId))
-
-        try {
-          // do not need to require acks since even if the tombstone is lost,
-          // it will be appended again by the new leader
-          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
-        } catch {
-          case t: Throwable =>
-            error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
-          // ignore and continue
+      getMessageFormatVersionAndTimestamp(groupPartition).foreach { case (magicValue, timestamp) =>
+        val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
+          timestamp = timestamp, magicValue = magicValue)
+
+        val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
+        partitionOpt.foreach { partition =>
+          val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
+
+          trace("Marking group %s as deleted.".format(group.groupId))
+
+          try {
+            // do not need to require acks since even if the tombstone is lost,
+            // it will be appended again by the new leader
+            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
+          } catch {
+            case t: Throwable =>
+              error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
+            // ignore and continue
+          }
         }
       }
     }
@@ -175,75 +176,86 @@ class GroupMetadataManager(val brokerId: Int,
 
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
-                        responseCallback: Errors => Unit): DelayedStore = {
-    val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
-    val groupMetadataValueVersion = if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort else GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+                        responseCallback: Errors => Unit): Option[DelayedStore] = {
+    val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+    magicValueAndTimestampOpt match {
+      case Some((magicValue, timestamp)) =>
+        val groupMetadataValueVersion = {
+          if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
+            0.toShort
+          else
+            GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+        }
 
-    val message = new Message(
-      key = GroupMetadataManager.groupMetadataKey(group.groupId),
-      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
-      timestamp = timestamp,
-      magicValue = magicValue)
+        val message = new Message(
+          key = GroupMetadataManager.groupMetadataKey(group.groupId),
+          bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
+          timestamp = timestamp,
+          magicValue = magicValue)
 
-    val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+        val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
 
-    val groupMetadataMessageSet = Map(groupMetadataPartition ->
-      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
+        val groupMetadataMessageSet = Map(groupMetadataPartition ->
+          new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
 
-    val generationId = group.generationId
+        val generationId = group.generationId
 
-    // set the callback function to insert the created group into cache after log append completed
-    def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-      // the append response should only contain the topics partition
-      if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
-        throw new IllegalStateException("Append status %s should only have one partition %s"
-          .format(responseStatus, groupMetadataPartition))
+        // set the callback function to insert the created group into cache after log append completed
+        def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+          // the append response should only contain the topics partition
+          if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
+            throw new IllegalStateException("Append status %s should only have one partition %s"
+              .format(responseStatus, groupMetadataPartition))
 
-      // construct the error status in the propagated assignment response
-      // in the cache
-      val status = responseStatus(groupMetadataPartition)
-      val statusError = Errors.forCode(status.errorCode)
+          // construct the error status in the propagated assignment response
+          // in the cache
+          val status = responseStatus(groupMetadataPartition)
+          val statusError = Errors.forCode(status.errorCode)
 
-      val responseError = if (statusError == Errors.NONE) {
-        Errors.NONE
-      } else {
-        debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
-          s"due to ${statusError.exceptionName}")
+          val responseError = if (statusError == Errors.NONE) {
+            Errors.NONE
+          } else {
+            debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
+              s"due to ${statusError.exceptionName}")
+
+            // transform the log append error code to the corresponding the commit status error code
+            statusError match {
+              case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                   | Errors.NOT_ENOUGH_REPLICAS
+                   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+                Errors.GROUP_COORDINATOR_NOT_AVAILABLE
 
-        // transform the log append error code to the corresponding the commit status error code
-        statusError match {
-          case Errors.UNKNOWN_TOPIC_OR_PARTITION
-            | Errors.NOT_ENOUGH_REPLICAS
-            | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-            Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+              case Errors.NOT_LEADER_FOR_PARTITION =>
+                Errors.NOT_COORDINATOR_FOR_GROUP
 
-          case Errors.NOT_LEADER_FOR_PARTITION =>
-            Errors.NOT_COORDINATOR_FOR_GROUP
+              case Errors.REQUEST_TIMED_OUT =>
+                Errors.REBALANCE_IN_PROGRESS
 
-          case Errors.REQUEST_TIMED_OUT =>
-            Errors.REBALANCE_IN_PROGRESS
+              case Errors.MESSAGE_TOO_LARGE
+                   | Errors.RECORD_LIST_TOO_LARGE
+                   | Errors.INVALID_FETCH_SIZE =>
 
-          case Errors.MESSAGE_TOO_LARGE
-            | Errors.RECORD_LIST_TOO_LARGE
-            | Errors.INVALID_FETCH_SIZE =>
+                error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
+                  s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
 
-            error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
-              s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
+                Errors.UNKNOWN
 
-            Errors.UNKNOWN
+              case other =>
+                error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
+                  s"due to unexpected error: ${statusError.exceptionName}")
 
-          case other =>
-            error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
-              s"due to unexpected error: ${statusError.exceptionName}")
+                other
+            }
+          }
 
-            other
+          responseCallback(responseError)
         }
-      }
+        Some(DelayedStore(groupMetadataMessageSet, putCacheCallback))
 
-      responseCallback(responseError)
+      case None =>
+        responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
+        None
     }
-
-    DelayedStore(groupMetadataMessageSet, putCacheCallback)
   }
 
   def store(delayedStore: DelayedStore) {
@@ -263,98 +275,108 @@ class GroupMetadataManager(val brokerId: Int,
                           consumerId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
+                          responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
     // construct the message set to append
-    val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
-      new Message(
-        key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
-        bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
-        timestamp = timestamp,
-        magicValue = magicValue
-      )
-    }.toSeq
-
-    val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-
-    val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
-      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
-
-    // set the callback function to insert offsets into cache after log append completed
-    def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-      // the append response should only contain the topics partition
-      if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
-        throw new IllegalStateException("Append status %s should only have one partition %s"
-          .format(responseStatus, offsetTopicPartition))
-
-      // construct the commit response status and insert
-      // the offset and metadata to cache if the append status has no error
-      val status = responseStatus(offsetTopicPartition)
-      val statusError = Errors.forCode(status.errorCode)
-
-      val responseCode =
-        group synchronized {
-          if (statusError == Errors.NONE) {
-            if (!group.is(Dead)) {
-              filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-                group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
-              }
-            }
-            Errors.NONE.code
-          } else {
-            if (!group.is(Dead)) {
-              filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-                group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
-              }
-            }
+    val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+    magicValueAndTimestampOpt match {
+      case Some((magicValue, timestamp)) =>
+        val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+          new Message(
+            key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+            bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
+            timestamp = timestamp,
+            magicValue = magicValue
+          )
+        }.toSeq
+
+        val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+
+        val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
+          new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+
+        // set the callback function to insert offsets into cache after log append completed
+        def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+          // the append response should only contain the topics partition
+          if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
+            throw new IllegalStateException("Append status %s should only have one partition %s"
+              .format(responseStatus, offsetTopicPartition))
+
+          // construct the commit response status and insert
+          // the offset and metadata to cache if the append status has no error
+          val status = responseStatus(offsetTopicPartition)
+          val statusError = Errors.forCode(status.errorCode)
+
+          val responseCode =
+            group synchronized {
+              if (statusError == Errors.NONE) {
+                if (!group.is(Dead)) {
+                  filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+                    group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+                  }
+                }
+                Errors.NONE.code
+              } else {
+                if (!group.is(Dead)) {
+                  filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+                    group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+                  }
+                }
 
-            debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
-              s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
+                debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
+                  s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
 
-            // transform the log append error code to the corresponding the commit status error code
-            val responseError = statusError match {
-              case Errors.UNKNOWN_TOPIC_OR_PARTITION
-                   | Errors.NOT_ENOUGH_REPLICAS
-                   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                // transform the log append error code to the corresponding the commit status error code
+                val responseError = statusError match {
+                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                       | Errors.NOT_ENOUGH_REPLICAS
+                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+                    Errors.GROUP_COORDINATOR_NOT_AVAILABLE
 
-              case Errors.NOT_LEADER_FOR_PARTITION =>
-                Errors.NOT_COORDINATOR_FOR_GROUP
+                  case Errors.NOT_LEADER_FOR_PARTITION =>
+                    Errors.NOT_COORDINATOR_FOR_GROUP
 
-              case Errors.MESSAGE_TOO_LARGE
-                   | Errors.RECORD_LIST_TOO_LARGE
-                   | Errors.INVALID_FETCH_SIZE =>
-                Errors.INVALID_COMMIT_OFFSET_SIZE
+                  case Errors.MESSAGE_TOO_LARGE
+                       | Errors.RECORD_LIST_TOO_LARGE
+                       | Errors.INVALID_FETCH_SIZE =>
+                    Errors.INVALID_COMMIT_OFFSET_SIZE
 
-              case other => other
+                  case other => other
+                }
+
+                responseError.code
+              }
             }
 
-            responseError.code
+          // compute the final error codes for the commit response
+          val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+            if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+              (topicAndPartition, responseCode)
+            else
+              (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
           }
+
+          // finally trigger the callback logic passed from the API layer
+          responseCallback(commitStatus)
         }
 
-      // compute the final error codes for the commit response
-      val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-          (topicAndPartition, responseCode)
-        else
-          (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
-      }
+        group synchronized {
+          group.prepareOffsetCommit(offsetMetadata)
+        }
 
-      // finally trigger the callback logic passed from the API layer
-      responseCallback(commitStatus)
-    }
+        Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback))
 
-    group synchronized {
-      group.prepareOffsetCommit(offsetMetadata)
+      case None =>
+        val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+          (topicAndPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+        }
+        responseCallback(commitStatus)
+        None
     }
-
-    DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
   }
 
   /**
@@ -580,44 +602,48 @@ class GroupMetadataManager(val brokerId: Int,
       group synchronized {
         if (!group.is(Dead)) {
           val offsetsPartition = partitionFor(groupId)
+          val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(offsetsPartition)
+          magicValueAndTimestampOpt match {
+            case Some((magicValue, timestamp)) =>
+              // delete the expired offsets from the table and generate tombstone messages to remove them from the log
+              val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
+                trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
+                val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
+                new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+              }.toBuffer
+
+              val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+              partitionOpt.foreach { partition =>
+                val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+                trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
+
+                try {
+                  // do not need to require acks since even if the tombstone is lost,
+                  // it will be appended again in the next purge cycle
+                  partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+                  offsetsRemoved += tombstones.size
+                }
+                catch {
+                  case t: Throwable =>
+                    error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
+                  // ignore and continue
+                }
+              }
 
-          // delete the expired offsets from the table and generate tombstone messages to remove them from the log
-          val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
-            trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
-            val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
-            val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
-            new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
-          }.toBuffer
-
-          val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-          partitionOpt.foreach { partition =>
-            val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-            trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
-
-            try {
-              // do not need to require acks since even if the tombstone is lost,
-              // it will be appended again in the next purge cycle
-              partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
-              offsetsRemoved += tombstones.size
-            }
-            catch {
-              case t: Throwable =>
-                error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
-              // ignore and continue
-            }
-          }
+              if (group.is(Empty) && !group.hasOffsets) {
+                group.transitionTo(Dead)
+                evictGroupAndDeleteMetadata(group)
+                info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+              }
 
-          if (group.is(Empty) && !group.hasOffsets) {
-            group.transitionTo(Dead)
-            evictGroupAndDeleteMetadata(group)
-            info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+            case None =>
+              info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, group.groupId))
           }
         }
       }
     }
 
     info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds() - startMs))
-
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
@@ -659,13 +685,23 @@ class GroupMetadataManager(val brokerId: Int,
       config.offsetsTopicNumPartitions
   }
 
-  private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
+  /**
+   * Check if the replica is local and return the message format version and timestamp
+   *
+   * @param   partition  Partition of GroupMetadataTopic
+   * @return  Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
+   */
+  private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] = {
     val groupMetadataTopicAndPartition = new TopicAndPartition(Topic.GroupMetadataTopicName, partition)
-    val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
-      throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
+    replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion =>
+      val timestamp = {
+        if (messageFormatVersion == Message.MagicValue_V0)
+          Message.NoTimestamp
+        else
+          time.milliseconds()
+      }
+      (messageFormatVersion, timestamp)
     }
-    val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
-    (messageFormatVersion, timestamp)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f1a9506..296beb3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -168,6 +168,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     val response =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+        // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+        // since this broker is no longer a replica for that offsets topic partition.
+        // This is required to handle the following scenario :
+        // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+        // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+        // is not cleared.
+        result.foreach { case (topicPartition, errorCode) =>
+          if (errorCode == Errors.NONE.code && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
+            coordinator.handleGroupEmigration(topicPartition.partition)
+          }
+        }
         new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
       } else {
         val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index a52fe48..2a81d31 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -106,7 +106,7 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
     groupMetadataManager.store(delayedStore)
     assertEquals(Some(Errors.NONE), maybeError)
   }
@@ -138,7 +138,7 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
     groupMetadataManager.store(delayedStore)
     assertEquals(Some(expectedError), maybeError)
 
@@ -169,12 +169,40 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback).get
     groupMetadataManager.store(delayedStore)
     assertEquals(Some(Errors.NONE), maybeError)
   }
 
   @Test
+  def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+    val memberId = "memberId"
+    val clientId = "clientId"
+    val clientHost = "localhost"
+
+    val group = new GroupMetadata(groupId)
+
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+      protocolType, List(("protocol", Array[Byte]())))
+    member.awaitingJoinCallback = _ => ()
+    group.add(memberId, member)
+    group.transitionTo(PreparingRebalance)
+    group.initNextGeneration()
+
+    EasyMock.replay(replicaManager)
+
+    var maybeError: Option[Errors] = None
+    def callback(error: Errors) {
+      maybeError = Some(error)
+    }
+
+    val delayedStoreOpt = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
+    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
   def testCommitOffset() {
     val memberId = ""
     val generationId = -1
@@ -196,7 +224,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
     assertTrue(group.hasOffsets)
 
     groupMetadataManager.store(delayedStore)
@@ -216,6 +244,36 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testCommitOffsetWhenCoordinatorHasMoved() {
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+    val memberId = ""
+    val generationId = -1
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStoreOpt = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+
+    assertFalse(commitErrors.isEmpty)
+    val maybeError = commitErrors.get.get(topicPartition)
+    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
   def testCommitOffsetFailure() {
     assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
     assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
@@ -250,7 +308,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
     assertTrue(group.hasOffsets)
 
     groupMetadataManager.store(delayedStore)
@@ -294,7 +352,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
     assertTrue(group.hasOffsets)
 
     groupMetadataManager.store(delayedStore)
@@ -348,7 +406,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
     assertTrue(group.hasOffsets)
 
     groupMetadataManager.store(delayedStore)
@@ -409,7 +467,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback)
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback).get
     assertTrue(group.hasOffsets)
 
     groupMetadataManager.store(delayedStore)