You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/09 08:53:18 UTC

[GitHub] [kafka] feyman2016 opened a new pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

feyman2016 opened a new pull request #9270:
URL: https://github.com/apache/kafka/pull/9270


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494426818



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+                                 memberId: String,

Review comment:
       Fixed, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 edited a comment on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 edited a comment on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862


   Thanks a lot for the review and merge @abbccdda @vvcephei!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r505721807



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3789,6 +3844,41 @@ class GroupCoordinatorTest {
                             requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
 
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+      requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+  private def sendStaticJoinGroupWithPersistence(groupId: String,

Review comment:
       Seems the helper only gets called once?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862


   Thanks a lot for the help @abbccdda @vvcephei!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494424452



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       For example, if previously there are one leader + one follower(both static) in a group, the protocols are: `List(("range", metadata), ("roundrobin", metadata))` for both, and current selected protocol is `range`, if later the follower join again with protocols: `List(("roundrobin", metadata))`, and the selectedProtocol should be `roundrobin`, now, the selectedProtocol and the actual assignment is not consistent, here I let it rebalance to make sure that the selectedProtocol and actual assignment are consistent. On the other way around, if we don't do the rebalance, we cannot successfully persist the group since this line `val metadata = memberMetadata.metadata(protocol)` in `kafka.coordinator.group.GroupMetadataManager#groupMetadataValue` will fail. Test `staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged ` is for this case. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r506001704



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3789,6 +3844,41 @@ class GroupCoordinatorTest {
                             requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
 
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+      requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+  private def sendStaticJoinGroupWithPersistence(groupId: String,

Review comment:
       Yes, perhaps I should reuse the function: `sendStaticJoinGroup`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9270:
URL: https://github.com/apache/kafka/pull/9270


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-696308421


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       @vvcephei  @abbccdda Revised the persistence failure handling logic, now it will revert the member id update in the groupMetaData if any persistence error encountered and call the responseCallback with the returned error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714183165


   @vvcephei Thanks for the help, fyi, I also tried to build locally with the apache/trunk merged, it succeeded.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-712978606


   @abbccdda @vvcephei Updated the PR, thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+                                 memberId: String,

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       Could you elaborate why this case is possible? We do have checks for `!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group protocol is incompatible, won't we just reject the rejoin?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Should we reply join failure if the persistence of the group metadata failed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-708859895


   Hi, @vvcephei and @abbccdda ,sorry this PR takes a very long time,  just updated the PR, could you help to review? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+                                 memberId: String,

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       Could you elaborate why this case is possible? We do have checks for `!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group protocol is incompatible, won't we just reject the rejoin?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Should we reply join failure if the persistence of the group metadata failed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714669017


   Cherry-picked to 2.5


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494425944



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Indeed, here need to be revised, will update~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Revised the persistence failure handling logic, now it will revert the member id update in the groupMetaData if any persistence error encountered and call the responseCallback with the returned error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494424452



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       For example, if previously there are one leader + one follower(both static) in a group, the protocols are: `List(("range", metadata), ("roundrobin", metadata))` for both, and current selected protocol is `range`, if later the follower join again with protocols: `List(("roundrobin", metadata))`, and the selectedProtocol should be `roundrobin`, now, the selectedProtocol and the actual assignment is not consistent, here I let it rebalance to make sure that the selectedProtocol and actual assignment are consistent. On the other way around, if we don't do the rebalance, we cannot successfully persist the group since this line `val metadata = memberMetadata.metadata(protocol)` in `kafka.coordinator.group.GroupMetadataManager#groupMetadataValue` will fail.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Indeed, here need to be revised, will update~

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       For example, if previously there are one leader + one follower(both static) in a group, the protocols are: `List(("range", metadata), ("roundrobin", metadata))` for both, and current selected protocol is `range`, if later the follower join again with protocols: `List(("roundrobin", metadata))`, and the selectedProtocol should be `roundrobin`, now, the selectedProtocol and the actual assignment is not consistent, here I let it rebalance to make sure that the selectedProtocol and actual assignment are consistent. On the other way around, if we don't do the rebalance, we cannot successfully persist the group since this line `val metadata = memberMetadata.metadata(protocol)` in `kafka.coordinator.group.GroupMetadataManager#groupMetadataValue` will fail. Test `staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged ` is for this case. 
   

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+                                 memberId: String,

Review comment:
       Fixed, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713766035


   By the way, it was true that trunk was broken. Fixed by: https://github.com/apache/kafka/commit/2db67db8e1329cb2e047322cff81d97ff98b4328


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-695909047






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r497818766



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Hi @feyman2016 , I just noticed this PR. Thanks for picking this up!
   
   I'm wondering something along similar lines; should we move the whole join callback inside this block? It seems like that's the way we can delay the join group response until after the update is actually persisted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713647509


   I've just merged trunk into this branch in an effort to get Jenkins to build.
   
   I tried just re-running the Jenkins job, but it kept erroring out on some gradle/environment variables. I noticed that other recent PRs are building, so theorized that if we merge trunk in, we might get the build to work.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-695909047


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r506551647



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1037,24 +1037,52 @@ class GroupCoordinator(val brokerId: Int,
 
     val knownStaticMember = group.get(newMemberId)
     group.updateMember(knownStaticMember, protocols, responseCallback)
+    val oldProtocols = knownStaticMember.supportedProtocols
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            if (error != Errors.NONE) {
+              warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+
+              // Failed to persist member.id of the given static member, revert the update of the static member in the group.
+              group.updateMember(knownStaticMember, oldProtocols, null)
+              val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
+              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+              responseCallback(JoinGroupResult(
+                List.empty,
+                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                generationId = group.generationId,
+                protocolType = group.protocolType,
+                protocolName = group.protocolName,
+                leaderId = currentLeader,
+                error = error
+              ))
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))

Review comment:
       Hey @feyman2016 ,
   
   I'm still wondering if this block should be inside the callback of `storeGroup`. Otherwise, we would already have sent the response to the client before the `storeGroup` completes, and the client will never see the error response on L1058.
   
   Or did we specifically decide to make the `storeGroup` call best effort?
   
   (cc @abbccdda )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494424452



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+              }
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")

Review comment:
       For example, if previously there are one leader + one follower(both static) in a group, the protocols are: `List(("range", metadata), ("roundrobin", metadata))` for both, and current selected protocol is `range`, if later the follower join again with protocols: `List(("roundrobin", metadata))`, and the selectedProtocol should be `roundrobin`, now, the selectedProtocol and the actual assignment is not consistent, here I let it rebalance to make sure that the selectedProtocol and actual assignment are consistent. On the other way around, if we don't do the rebalance, we cannot successfully persist the group since this line `val metadata = memberMetadata.metadata(protocol)` in `kafka.coordinator.group.GroupMetadataManager#groupMetadataValue` will fail.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714574329


   Cherry-picked to 2.6 (cc @mimaison )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713663316


   FYI, I just got a successful build of this locally with master merged in.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-696317720


   test this
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713339950


   Thanks! @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r507346339



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1037,24 +1037,52 @@ class GroupCoordinator(val brokerId: Int,
 
     val knownStaticMember = group.get(newMemberId)
     group.updateMember(knownStaticMember, protocols, responseCallback)
+    val oldProtocols = knownStaticMember.supportedProtocols
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            if (error != Errors.NONE) {
+              warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+
+              // Failed to persist member.id of the given static member, revert the update of the static member in the group.
+              group.updateMember(knownStaticMember, oldProtocols, null)
+              val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
+              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+              responseCallback(JoinGroupResult(
+                List.empty,
+                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                generationId = group.generationId,
+                protocolType = group.protocolType,
+                protocolName = group.protocolName,
+                leaderId = currentLeader,
+                error = error
+              ))
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))

Review comment:
       @vvcephei I think it make sense, will put the call of `group.maybeInvokeJoinCallback` in the callback, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714559804






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r508668288



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1037,24 +1037,52 @@ class GroupCoordinator(val brokerId: Int,
 
     val knownStaticMember = group.get(newMemberId)
     group.updateMember(knownStaticMember, protocols, responseCallback)
+    val oldProtocols = knownStaticMember.supportedProtocols
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            if (error != Errors.NONE) {
+              warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+
+              // Failed to persist member.id of the given static member, revert the update of the static member in the group.
+              group.updateMember(knownStaticMember, oldProtocols, null)
+              val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
+              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+              responseCallback(JoinGroupResult(
+                List.empty,
+                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                generationId = group.generationId,
+                protocolType = group.protocolType,
+                protocolName = group.protocolName,
+                leaderId = currentLeader,
+                error = error
+              ))
+            }
+          })
+          group.maybeInvokeJoinCallback(member, JoinGroupResult(
+            members = List.empty,
+            memberId = newMemberId,
+            generationId = group.generationId,
+            protocolType = group.protocolType,
+            protocolName = group.protocolName,
+            // We want to avoid current leader performing trivial assignment while the group
+            // is in stable stage, because the new assignment in leader's next sync call
+            // won't be broadcast by a stable group. This could be guaranteed by
+            // always returning the old leader id so that the current leader won't assume itself
+            // as a leader based on the returned message, since the new member.id won't match
+            // returned leader id, therefore no assignment will be performed.
+            leaderId = currentLeader,
+            error = Errors.NONE))

Review comment:
       @vvcephei Updated as proposed, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714551566


   The test failure was unrelated: Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r506001704



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3789,6 +3844,41 @@ class GroupCoordinatorTest {
                             requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
 
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+      requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+  private def sendStaticJoinGroupWithPersistence(groupId: String,

Review comment:
       Yes, exactly, should I make it inline ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r499196092



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            group.inLock {
+              if (error != Errors.NONE) {

Review comment:
       Thanks for the review, @vvcephei . You're right for the current commit, but as @abbccdda mentioned, the current commit does have issues, I'm revising it to `reset` the `static member` in the group and return the error if `storeGroup` encountered any errors. Code is ready, but met some local issues when writing unittests, will update once it's ready.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-707819376


   Hi @feyman2016 and @abbccdda ,
   
   I hope you've both been well. I just wanted to check in on the status of this PR, since we still have a flaky test failure related to it.
   
   Thanks,
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org