You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 21:23:07 UTC

kafka git commit: HOTFIX: fix group coordinator edge cases around metadata storage callback

Repository: kafka
Updated Branches:
  refs/heads/trunk 34d997665 -> 83fb73460


HOTFIX: fix group coordinator edge cases around metadata storage callback

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #451 from hachikuji/hotfix-group-coordinator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83fb7346
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83fb7346
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83fb7346

Branch: refs/heads/trunk
Commit: 83fb734603376d1c9ef1d88bcb5f160da5522e45
Parents: 34d9976
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Nov 8 12:29:02 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 12:29:02 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/coordinator/GroupCoordinator.scala     | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83fb7346/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 4d69840..2acc223 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -279,12 +279,13 @@ class GroupCoordinator(val brokerId: Int,
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
               // persist the group metadata and upon finish transition to stable and propagate the assignment
+              val generationId = group.generationId
               groupManager.storeGroup(group, assignment, (errorCode: Short) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this callback,
-                  // so we must ensure we are still in the AwaitingSync state when it gets invoked.
-                  // if we have transitioned to another state, then we shouldn't do anything
-                  if (group.is(AwaitingSync)) {
+                  // so we must ensure we are still in the AwaitingSync state and the same generation
+                  // when it gets invoked. if we have transitioned to another state, then do nothing
+                  if (group.is(AwaitingSync) && generationId == group.generationId) {
                     if (errorCode != Errors.NONE.code) {
                       resetAndPropagateAssignmentError(group, errorCode)
                       maybePrepareRebalance(group)
@@ -485,6 +486,12 @@ class GroupCoordinator(val brokerId: Int,
       if (member.awaitingSyncCallback != null) {
         member.awaitingSyncCallback(member.assignment, errorCode)
         member.awaitingSyncCallback = null
+
+        // reset the session timeout for members after propagating the member's assignment.
+        // This is because if any member's session expired while we were still awaiting either
+        // the leader sync group or the storage callback, its expiration will be ignored and no
+        // future heartbeat expectations will not be scheduled.
+        completeAndScheduleNextHeartbeatExpiration(group, member)
       }
     }
   }