You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/02 03:57:07 UTC
kafka git commit: KAFKA-4399;
Fix deadlock between cleanupGroupMetadata and offset commit
Repository: kafka
Updated Branches:
refs/heads/trunk b45a67ede -> ea370be51
KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit
Author: Alexey Ozeritsky <ao...@yandex-team.ru>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2125 from resetius/KAFKA-4399
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea370be5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea370be5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea370be5
Branch: refs/heads/trunk
Commit: ea370be518a783f3a5d8d834f78c82e36bf968b3
Parents: b45a67e
Author: Alexey Ozeritsky <ao...@yandex-team.ru>
Authored: Thu Dec 1 19:09:06 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Dec 1 19:14:15 2016 -0800
----------------------------------------------------------------------
.../coordinator/GroupMetadataManager.scala | 106 +++++++++----------
1 file changed, 48 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ea370be5/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index d1e6945..b45f25b 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -136,43 +136,6 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- /**
- * Remove the group from the cache and delete all metadata associated with it. This should be
- * called only after all offsets for the group have expired and no members are remaining (i.e.
- * it is in the Empty state).
- */
- private def evictGroupAndDeleteMetadata(group: GroupMetadata) {
- // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
- // while the group is being removed due to coordinator emigration). We also avoid writing the tombstone
- // when the generationId is 0, since this group is only using Kafka for offset storage.
- if (groupMetadataCache.remove(group.groupId, group) && group.generationId > 0) {
- // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
- // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
- // retry removing this group.
- val groupPartition = partitionFor(group.groupId)
- getMessageFormatVersionAndTimestamp(groupPartition).foreach { case (magicValue, timestamp) =>
- val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
- timestamp = timestamp, magicValue = magicValue)
-
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
- partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
-
- trace("Marking group %s as deleted.".format(group.groupId))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again by the new leader
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
- } catch {
- case t: Throwable =>
- error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
- // ignore and continue
- }
- }
- }
- }
- }
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
@@ -598,7 +561,7 @@ class GroupMetadataManager(val brokerId: Int,
val startMs = time.milliseconds()
var offsetsRemoved = 0
- groupMetadataCache.foreach { case (groupId, group) =>
+ val result = groupMetadataCache.flatMap { case (groupId, group) =>
group synchronized {
if (!group.is(Dead)) {
val offsetsPartition = partitionFor(groupId)
@@ -612,32 +575,59 @@ class GroupMetadataManager(val brokerId: Int,
new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
}.toBuffer
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again in the next purge cycle
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
- offsetsRemoved += tombstones.size
- }
- catch {
- case t: Throwable =>
- error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
- // ignore and continue
- }
- }
+ val numOffsetsExpired = tombstones.size
if (group.is(Empty) && !group.hasOffsets) {
group.transitionTo(Dead)
- evictGroupAndDeleteMetadata(group)
- info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+
+ // We avoid writing the tombstone
+ // when the generationId is 0, since this group is only using Kafka for offset storage.
+ if (groupMetadataCache.get(groupId) == group && group.generationId > 0) {
+ // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+ // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+ // retry removing this group.
+
+ trace("Marking group %s as deleted.".format(groupId))
+
+ tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(groupId),
+ timestamp = timestamp, magicValue = magicValue)
+ }
}
+ Some((group, offsetsPartition, tombstones, numOffsetsExpired))
case None =>
- info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, group.groupId))
+ info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, groupId))
+ None
+ }
+ } else {
+ None
+ }
+ }
+ }
+
+ for ((group, offsetsPartition, tombstones, numOffsetsExpired) <- result) {
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+
+ partitionOpt.foreach { partition =>
+ val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ trace("Marked %d offsets in %s for deletion.".format(numOffsetsExpired, appendPartition))
+
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again in the next purge cycle
+ partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+ offsetsRemoved += numOffsetsExpired
+ }
+ catch {
+ case t: Throwable =>
+ error(s"Failed to write ${tombstones.size} tombstones for group ${group.groupId} to $appendPartition.", t)
+ // ignore and continue
+ }
+
+ group synchronized {
+ if (group.is(Dead)) {
+ groupMetadataCache.remove(group.groupId, group)
+ info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
}
}
}