You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 21:30:47 UTC
kafka git commit: HOTFIX: fix group coordinator edge cases around
metadata storage callback (0.9.0)
Repository: kafka
Updated Branches:
refs/heads/0.9.0 27d44afe6 -> 32cd3e35f
HOTFIX: fix group coordinator edge cases around metadata storage callback (0.9.0)
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #453 from hachikuji/hotfix-group-coordinator-0.9
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32cd3e35
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32cd3e35
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32cd3e35
Branch: refs/heads/0.9.0
Commit: 32cd3e35f1ea8251a51860cc48a44fb2fbfd7c0e
Parents: 27d44af
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Nov 8 12:36:42 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 12:36:42 2015 -0800
----------------------------------------------------------------------
.../scala/kafka/coordinator/GroupCoordinator.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32cd3e35/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 4d69840..2acc223 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -279,12 +279,13 @@ class GroupCoordinator(val brokerId: Int,
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// persist the group metadata and upon finish transition to stable and propagate the assignment
+ val generationId = group.generationId
groupManager.storeGroup(group, assignment, (errorCode: Short) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
- // so we must ensure we are still in the AwaitingSync state when it gets invoked.
- // if we have transitioned to another state, then we shouldn't do anything
- if (group.is(AwaitingSync)) {
+ // so we must ensure we are still in the AwaitingSync state and the same generation
+ // when it gets invoked. if we have transitioned to another state, then do nothing
+ if (group.is(AwaitingSync) && generationId == group.generationId) {
if (errorCode != Errors.NONE.code) {
resetAndPropagateAssignmentError(group, errorCode)
maybePrepareRebalance(group)
@@ -485,6 +486,12 @@ class GroupCoordinator(val brokerId: Int,
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(member.assignment, errorCode)
member.awaitingSyncCallback = null
+
+ // reset the session timeout for members after propagating the member's assignment.
+ // This is because if any member's session expired while we were still awaiting either
+ // the leader sync group or the storage callback, its expiration will be ignored and no
+ // future heartbeat expectations will not be scheduled.
+ completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}