You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/19 15:40:41 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`

dajac commented on a change in pull request #11229:
URL: https://github.com/apache/kafka/pull/11229#discussion_r692245978



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1391,78 +1392,92 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = {
+  def tryCompleteJoin(
+    group: GroupMetadata,
+    generationId: Int,
+    forceComplete: () => Boolean
+  ): Boolean = {
     group.inLock {
-      if (group.hasAllMembersJoined)
+      if (generationId != group.generationId) {
+        forceComplete()
+      } else if (group.hasAllMembersJoined) {
         forceComplete()
-      else false
+      } else false
     }
   }
 
-  def onCompleteJoin(group: GroupMetadata): Unit = {
+  def onCompleteJoin(
+    group: GroupMetadata,
+    generationId: Int
+  ): Unit = {
     group.inLock {
-      val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)
-      if (notYetRejoinedDynamicMembers.nonEmpty) {
-        info(s"Group ${group.groupId} removed dynamic members " +
-          s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
-
-        notYetRejoinedDynamicMembers.values.foreach { failedMember =>
-          group.remove(failedMember.memberId)
-          removeHeartbeatForLeavingMember(group, failedMember.memberId)
-        }
-      }
-
-      if (group.is(Dead)) {
-        info(s"Group ${group.groupId} is dead, skipping rebalance stage")
-      } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
-        // If all members are not rejoining, we will postpone the completion
-        // of rebalance preparing stage, and send out another delayed operation
-        // until session timeout removes all the non-responsive members.
-        error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
-        rebalancePurgatory.tryCompleteElseWatch(
-          new DelayedJoin(this, group, group.rebalanceTimeoutMs),
-          Seq(GroupJoinKey(group.groupId)))
+      if (generationId != group.generationId) {
+        error(s"Received unexpected notification of join complete for ${group.groupId} " +
+          s"with an old generation $generationId while the group has ${group.generationId}.")
       } else {
-        group.initNextGeneration()
-        if (group.is(Empty)) {
-          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
-            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
+        val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)

Review comment:
       That's correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org