You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/23 04:33:16 UTC
kafka git commit: KAFKA-4362;
Consumer can fail after reassignment of the offsets topic partition
Repository: kafka
Updated Branches:
refs/heads/trunk dcea5f838 -> 4b003d8bc
KAFKA-4362; Consumer can fail after reassignment of the offsets topic partition
Author: MayureshGharat <gh...@gmail.com>
Reviewers: Jiangjie Qin <be...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #2116 from MayureshGharat/KAFKA-4362
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b003d8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b003d8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b003d8b
Branch: refs/heads/trunk
Commit: 4b003d8bcfffded55a00b8ecc9eed8eb373fb6c7
Parents: dcea5f8
Author: MayureshGharat <gh...@gmail.com>
Authored: Tue Nov 22 20:32:42 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 22 20:32:42 2016 -0800
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 16 +-
.../coordinator/GroupMetadataManager.scala | 390 ++++++++++---------
.../src/main/scala/kafka/server/KafkaApis.scala | 11 +
.../coordinator/GroupMetadataManagerTest.scala | 74 +++-
4 files changed, 298 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 48efe39..eb479d5 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -267,7 +267,7 @@ class GroupCoordinator(val brokerId: Int,
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
- delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
+ delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the AwaitingSync state and the same generation
@@ -282,7 +282,7 @@ class GroupCoordinator(val brokerId: Int,
}
}
}
- }))
+ })
}
case Stable =>
@@ -434,8 +434,8 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
} else if (generationId < 0 && group.is(Empty)) {
// the group is only using Kafka to store offsets
- delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
- offsetMetadata, responseCallback))
+ delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback)
} else if (group.is(AwaitingSync)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
} else if (!group.has(memberId)) {
@@ -445,8 +445,8 @@ class GroupCoordinator(val brokerId: Int,
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
- delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
- offsetMetadata, responseCallback))
+ delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback)
}
}
@@ -692,14 +692,14 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
- delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, error => {
+ delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
if (error != Errors.NONE) {
// we failed to write the empty group metadata. If the broker fails before another rebalance,
// the previous generation written to the log will become active again (and most likely timeout).
// This should be safe since there are no active members in an empty generation, so we just warn.
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
- }))
+ })
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e0c8e65..d1e6945 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -150,24 +150,25 @@ class GroupMetadataManager(val brokerId: Int,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupPartition = partitionFor(group.groupId)
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
- val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
- timestamp = timestamp, magicValue = magicValue)
-
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
- partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
-
- trace("Marking group %s as deleted.".format(group.groupId))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again by the new leader
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
- } catch {
- case t: Throwable =>
- error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
- // ignore and continue
+ getMessageFormatVersionAndTimestamp(groupPartition).foreach { case (magicValue, timestamp) =>
+ val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
+ timestamp = timestamp, magicValue = magicValue)
+
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
+ partitionOpt.foreach { partition =>
+ val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
+
+ trace("Marking group %s as deleted.".format(group.groupId))
+
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again by the new leader
+ partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
+ } catch {
+ case t: Throwable =>
+ error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
+ // ignore and continue
+ }
}
}
}
@@ -175,75 +176,86 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): DelayedStore = {
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
- val groupMetadataValueVersion = if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort else GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+ responseCallback: Errors => Unit): Option[DelayedStore] = {
+ val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+ magicValueAndTimestampOpt match {
+ case Some((magicValue, timestamp)) =>
+ val groupMetadataValueVersion = {
+ if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
+ 0.toShort
+ else
+ GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+ }
- val message = new Message(
- key = GroupMetadataManager.groupMetadataKey(group.groupId),
- bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
- timestamp = timestamp,
- magicValue = magicValue)
+ val message = new Message(
+ key = GroupMetadataManager.groupMetadataKey(group.groupId),
+ bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
+ timestamp = timestamp,
+ magicValue = magicValue)
- val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+ val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
- val groupMetadataMessageSet = Map(groupMetadataPartition ->
- new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
+ val groupMetadataMessageSet = Map(groupMetadataPartition ->
+ new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
- val generationId = group.generationId
+ val generationId = group.generationId
- // set the callback function to insert the created group into cache after log append completed
- def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- // the append response should only contain the topics partition
- if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
- throw new IllegalStateException("Append status %s should only have one partition %s"
- .format(responseStatus, groupMetadataPartition))
+ // set the callback function to insert the created group into cache after log append completed
+ def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+ // the append response should only contain the topics partition
+ if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
+ throw new IllegalStateException("Append status %s should only have one partition %s"
+ .format(responseStatus, groupMetadataPartition))
- // construct the error status in the propagated assignment response
- // in the cache
- val status = responseStatus(groupMetadataPartition)
- val statusError = Errors.forCode(status.errorCode)
+ // construct the error status in the propagated assignment response
+ // in the cache
+ val status = responseStatus(groupMetadataPartition)
+ val statusError = Errors.forCode(status.errorCode)
- val responseError = if (statusError == Errors.NONE) {
- Errors.NONE
- } else {
- debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
- s"due to ${statusError.exceptionName}")
+ val responseError = if (statusError == Errors.NONE) {
+ Errors.NONE
+ } else {
+ debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
+ s"due to ${statusError.exceptionName}")
+
+ // transform the log append error code to the corresponding the commit status error code
+ statusError match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE
- // transform the log append error code to the corresponding the commit status error code
- statusError match {
- case Errors.UNKNOWN_TOPIC_OR_PARTITION
- | Errors.NOT_ENOUGH_REPLICAS
- | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
- Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR_FOR_GROUP
- case Errors.NOT_LEADER_FOR_PARTITION =>
- Errors.NOT_COORDINATOR_FOR_GROUP
+ case Errors.REQUEST_TIMED_OUT =>
+ Errors.REBALANCE_IN_PROGRESS
- case Errors.REQUEST_TIMED_OUT =>
- Errors.REBALANCE_IN_PROGRESS
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
- case Errors.MESSAGE_TOO_LARGE
- | Errors.RECORD_LIST_TOO_LARGE
- | Errors.INVALID_FETCH_SIZE =>
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
+ s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
- error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
- s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
+ Errors.UNKNOWN
- Errors.UNKNOWN
+ case other =>
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
+ s"due to unexpected error: ${statusError.exceptionName}")
- case other =>
- error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
- s"due to unexpected error: ${statusError.exceptionName}")
+ other
+ }
+ }
- other
+ responseCallback(responseError)
}
- }
+ Some(DelayedStore(groupMetadataMessageSet, putCacheCallback))
- responseCallback(responseError)
+ case None =>
+ responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
+ None
}
-
- DelayedStore(groupMetadataMessageSet, putCacheCallback)
}
def store(delayedStore: DelayedStore) {
@@ -263,98 +275,108 @@ class GroupMetadataManager(val brokerId: Int,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
+ responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
// construct the message set to append
- val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
- new Message(
- key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
- bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
- timestamp = timestamp,
- magicValue = magicValue
- )
- }.toSeq
-
- val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-
- val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
- new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
-
- // set the callback function to insert offsets into cache after log append completed
- def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- // the append response should only contain the topics partition
- if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
- throw new IllegalStateException("Append status %s should only have one partition %s"
- .format(responseStatus, offsetTopicPartition))
-
- // construct the commit response status and insert
- // the offset and metadata to cache if the append status has no error
- val status = responseStatus(offsetTopicPartition)
- val statusError = Errors.forCode(status.errorCode)
-
- val responseCode =
- group synchronized {
- if (statusError == Errors.NONE) {
- if (!group.is(Dead)) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
- }
- }
- Errors.NONE.code
- } else {
- if (!group.is(Dead)) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
- }
- }
+ val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+ magicValueAndTimestampOpt match {
+ case Some((magicValue, timestamp)) =>
+ val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ new Message(
+ key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+ bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
+ timestamp = timestamp,
+ magicValue = magicValue
+ )
+ }.toSeq
+
+ val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+
+ val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
+ new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+
+ // set the callback function to insert offsets into cache after log append completed
+ def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+ // the append response should only contain the topics partition
+ if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
+ throw new IllegalStateException("Append status %s should only have one partition %s"
+ .format(responseStatus, offsetTopicPartition))
+
+ // construct the commit response status and insert
+ // the offset and metadata to cache if the append status has no error
+ val status = responseStatus(offsetTopicPartition)
+ val statusError = Errors.forCode(status.errorCode)
+
+ val responseCode =
+ group synchronized {
+ if (statusError == Errors.NONE) {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ }
+ }
+ Errors.NONE.code
+ } else {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ }
+ }
- debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
- s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
+ debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
+ s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
- // transform the log append error code to the corresponding the commit status error code
- val responseError = statusError match {
- case Errors.UNKNOWN_TOPIC_OR_PARTITION
- | Errors.NOT_ENOUGH_REPLICAS
- | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
- Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+ // transform the log append error code to the corresponding the commit status error code
+ val responseError = statusError match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE
- case Errors.NOT_LEADER_FOR_PARTITION =>
- Errors.NOT_COORDINATOR_FOR_GROUP
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR_FOR_GROUP
- case Errors.MESSAGE_TOO_LARGE
- | Errors.RECORD_LIST_TOO_LARGE
- | Errors.INVALID_FETCH_SIZE =>
- Errors.INVALID_COMMIT_OFFSET_SIZE
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+ Errors.INVALID_COMMIT_OFFSET_SIZE
- case other => other
+ case other => other
+ }
+
+ responseError.code
+ }
}
- responseError.code
+ // compute the final error codes for the commit response
+ val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+ (topicAndPartition, responseCode)
+ else
+ (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
}
+
+ // finally trigger the callback logic passed from the API layer
+ responseCallback(commitStatus)
}
- // compute the final error codes for the commit response
- val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
- (topicAndPartition, responseCode)
- else
- (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
- }
+ group synchronized {
+ group.prepareOffsetCommit(offsetMetadata)
+ }
- // finally trigger the callback logic passed from the API layer
- responseCallback(commitStatus)
- }
+ Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback))
- group synchronized {
- group.prepareOffsetCommit(offsetMetadata)
+ case None =>
+ val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ (topicAndPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ }
+ responseCallback(commitStatus)
+ None
}
-
- DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}
/**
@@ -580,44 +602,48 @@ class GroupMetadataManager(val brokerId: Int,
group synchronized {
if (!group.is(Dead)) {
val offsetsPartition = partitionFor(groupId)
+ val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(offsetsPartition)
+ magicValueAndTimestampOpt match {
+ case Some((magicValue, timestamp)) =>
+ // delete the expired offsets from the table and generate tombstone messages to remove them from the log
+ val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
+ trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
+ val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
+ new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+ }.toBuffer
+
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ partitionOpt.foreach { partition =>
+ val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
+
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again in the next purge cycle
+ partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+ offsetsRemoved += tombstones.size
+ }
+ catch {
+ case t: Throwable =>
+ error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
+ // ignore and continue
+ }
+ }
- // delete the expired offsets from the table and generate tombstone messages to remove them from the log
- val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
- trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
- val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
- new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
- }.toBuffer
-
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again in the next purge cycle
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
- offsetsRemoved += tombstones.size
- }
- catch {
- case t: Throwable =>
- error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
- // ignore and continue
- }
- }
+ if (group.is(Empty) && !group.hasOffsets) {
+ group.transitionTo(Dead)
+ evictGroupAndDeleteMetadata(group)
+ info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+ }
- if (group.is(Empty) && !group.hasOffsets) {
- group.transitionTo(Dead)
- evictGroupAndDeleteMetadata(group)
- info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+ case None =>
+ info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, group.groupId))
}
}
}
}
info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds() - startMs))
-
}
private def getHighWatermark(partitionId: Int): Long = {
@@ -659,13 +685,23 @@ class GroupMetadataManager(val brokerId: Int,
config.offsetsTopicNumPartitions
}
- private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
+ /**
+ * Check if the replica is local and return the message format version and timestamp
+ *
+ * @param partition Partition of GroupMetadataTopic
+ * @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
+ */
+ private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] = {
val groupMetadataTopicAndPartition = new TopicAndPartition(Topic.GroupMetadataTopicName, partition)
- val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
- throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
+ replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion =>
+ val timestamp = {
+ if (messageFormatVersion == Message.MagicValue_V0)
+ Message.NoTimestamp
+ else
+ time.milliseconds()
+ }
+ (messageFormatVersion, timestamp)
}
- val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
- (messageFormatVersion, timestamp)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f1a9506..296beb3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -168,6 +168,17 @@ class KafkaApis(val requestChannel: RequestChannel,
val response =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+ // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+ // since this broker is no longer a replica for that offsets topic partition.
+ // This is required to handle the following scenario :
+ // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+ // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+ // is not cleared.
+ result.foreach { case (topicPartition, errorCode) =>
+ if (errorCode == Errors.NONE.code && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
+ coordinator.handleGroupEmigration(topicPartition.partition)
+ }
+ }
new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
} else {
val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b003d8b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index a52fe48..2a81d31 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -106,7 +106,7 @@ class GroupMetadataManagerTest {
maybeError = Some(error)
}
- val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+ val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
groupMetadataManager.store(delayedStore)
assertEquals(Some(Errors.NONE), maybeError)
}
@@ -138,7 +138,7 @@ class GroupMetadataManagerTest {
maybeError = Some(error)
}
- val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+ val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
groupMetadataManager.store(delayedStore)
assertEquals(Some(expectedError), maybeError)
@@ -169,12 +169,40 @@ class GroupMetadataManagerTest {
maybeError = Some(error)
}
- val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
+ val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback).get
groupMetadataManager.store(delayedStore)
assertEquals(Some(Errors.NONE), maybeError)
}
@Test
+ def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+ val memberId = "memberId"
+ val clientId = "clientId"
+ val clientHost = "localhost"
+
+ val group = new GroupMetadata(groupId)
+
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ protocolType, List(("protocol", Array[Byte]())))
+ member.awaitingJoinCallback = _ => ()
+ group.add(memberId, member)
+ group.transitionTo(PreparingRebalance)
+ group.initNextGeneration()
+
+ EasyMock.replay(replicaManager)
+
+ var maybeError: Option[Errors] = None
+ def callback(error: Errors) {
+ maybeError = Some(error)
+ }
+
+ val delayedStoreOpt = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
+ assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+ EasyMock.verify(replicaManager)
+ }
+
+ @Test
def testCommitOffset() {
val memberId = ""
val generationId = -1
@@ -196,7 +224,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
- val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+ val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
assertTrue(group.hasOffsets)
groupMetadataManager.store(delayedStore)
@@ -216,6 +244,36 @@ class GroupMetadataManagerTest {
}
@Test
+ def testCommitOffsetWhenCoordinatorHasMoved() {
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+ val memberId = ""
+ val generationId = -1
+ val topicPartition = new TopicPartition("foo", 0)
+ val offset = 37
+
+ groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+ val group = new GroupMetadata(groupId)
+ groupMetadataManager.addGroup(group)
+
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+ EasyMock.replay(replicaManager)
+
+ var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+ def callback(errors: immutable.Map[TopicPartition, Short]) {
+ commitErrors = Some(errors)
+ }
+
+ val delayedStoreOpt = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+
+ assertFalse(commitErrors.isEmpty)
+ val maybeError = commitErrors.get.get(topicPartition)
+ assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+ EasyMock.verify(replicaManager)
+ }
+
+ @Test
def testCommitOffsetFailure() {
assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
@@ -250,7 +308,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
- val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+ val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
assertTrue(group.hasOffsets)
groupMetadataManager.store(delayedStore)
@@ -294,7 +352,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
- val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+ val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
assertTrue(group.hasOffsets)
groupMetadataManager.store(delayedStore)
@@ -348,7 +406,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
- val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+ val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
assertTrue(group.hasOffsets)
groupMetadataManager.store(delayedStore)
@@ -409,7 +467,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
- val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback)
+ val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback).get
assertTrue(group.hasOffsets)
groupMetadataManager.store(delayedStore)