You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 21:23:07 UTC
kafka git commit: HOTFIX: fix group coordinator edge cases around
metadata storage callback
Repository: kafka
Updated Branches:
refs/heads/trunk 34d997665 -> 83fb73460
HOTFIX: fix group coordinator edge cases around metadata storage callback
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #451 from hachikuji/hotfix-group-coordinator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83fb7346
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83fb7346
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83fb7346
Branch: refs/heads/trunk
Commit: 83fb734603376d1c9ef1d88bcb5f160da5522e45
Parents: 34d9976
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Nov 8 12:29:02 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 12:29:02 2015 -0800
----------------------------------------------------------------------
.../scala/kafka/coordinator/GroupCoordinator.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/83fb7346/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 4d69840..2acc223 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -279,12 +279,13 @@ class GroupCoordinator(val brokerId: Int,
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// persist the group metadata and upon finish transition to stable and propagate the assignment
+ val generationId = group.generationId
groupManager.storeGroup(group, assignment, (errorCode: Short) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
- // so we must ensure we are still in the AwaitingSync state when it gets invoked.
- // if we have transitioned to another state, then we shouldn't do anything
- if (group.is(AwaitingSync)) {
+ // so we must ensure we are still in the AwaitingSync state and the same generation
+ // when it gets invoked. if we have transitioned to another state, then do nothing
+ if (group.is(AwaitingSync) && generationId == group.generationId) {
if (errorCode != Errors.NONE.code) {
resetAndPropagateAssignmentError(group, errorCode)
maybePrepareRebalance(group)
@@ -485,6 +486,12 @@ class GroupCoordinator(val brokerId: Int,
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(member.assignment, errorCode)
member.awaitingSyncCallback = null
+
+ // reset the session timeout for members after propagating the member's assignment.
+ // This is because if any member's session expired while we were still awaiting either
+ // the leader sync group or the storage callback, its expiration will be ignored and no
+ // future heartbeat expectations will not be scheduled.
+ completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}