You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 15:07:47 UTC

[kafka] branch 2.7 updated: KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 8852f5d  KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)
8852f5d is described below

commit 8852f5d1b1d6e174171e2ec9815c01b06746b563
Author: feyman2016 <fe...@aliyun.com>
AuthorDate: Thu Oct 22 22:55:52 2020 +0800

    KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)
    
    Reviewers: Boyang Chen <bo...@apache.org>, John Roesler <vv...@apache.org>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  59 +++++++---
 .../coordinator/group/GroupCoordinatorTest.scala   | 120 +++++++++++++++++++--
 2 files changed, 157 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 60eb8da..f1df13b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1043,24 +1043,53 @@ class GroupCoordinator(val brokerId: Int,
 
     val knownStaticMember = group.get(newMemberId)
     group.updateMember(knownStaticMember, protocols, responseCallback)
+    val oldProtocols = knownStaticMember.supportedProtocols
 
     group.currentState match {
       case Stable =>
-        info(s"Static member joins during Stable stage will not trigger rebalance.")
-        group.maybeInvokeJoinCallback(member, JoinGroupResult(
-          members = List.empty,
-          memberId = newMemberId,
-          generationId = group.generationId,
-          protocolType = group.protocolType,
-          protocolName = group.protocolName,
-          // We want to avoid current leader performing trivial assignment while the group
-          // is in stable stage, because the new assignment in leader's next sync call
-          // won't be broadcast by a stable group. This could be guaranteed by
-          // always returning the old leader id so that the current leader won't assume itself
-          // as a leader based on the returned message, since the new member.id won't match
-          // returned leader id, therefore no assignment will be performed.
-          leaderId = currentLeader,
-          error = Errors.NONE))
+        // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+        // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+        val selectedProtocolOfNextGeneration = group.selectProtocol
+        if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+          info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+          val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+          groupManager.storeGroup(group, groupAssignment, error => {
+            if (error != Errors.NONE) {
+              warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+
+              // Failed to persist member.id of the given static member, revert the update of the static member in the group.
+              group.updateMember(knownStaticMember, oldProtocols, null)
+              val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
+              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+              responseCallback(JoinGroupResult(
+                List.empty,
+                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                generationId = group.generationId,
+                protocolType = group.protocolType,
+                protocolName = group.protocolName,
+                leaderId = currentLeader,
+                error = error
+              ))
+            } else {
+              group.maybeInvokeJoinCallback(member, JoinGroupResult(
+                members = List.empty,
+                memberId = newMemberId,
+                generationId = group.generationId,
+                protocolType = group.protocolType,
+                protocolName = group.protocolName,
+                // We want to avoid current leader performing trivial assignment while the group
+                // is in stable stage, because the new assignment in leader's next sync call
+                // won't be broadcast by a stable group. This could be guaranteed by
+                // always returning the old leader id so that the current leader won't assume itself
+                // as a leader based on the returned message, since the new member.id won't match
+                // returned leader id, therefore no assignment will be performed.
+                leaderId = currentLeader,
+                error = Errors.NONE))
+            }
+          })
+        } else {
+          maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")
+        }
       case CompletingRebalance =>
         // if the group is in after-sync stage, upon getting a new join-group of a known static member
         // we should still trigger a new rebalance, since the old member may already be sent to the leader
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index ee72faa..95085da 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -941,7 +941,7 @@ class GroupCoordinatorTest {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
 
     // A static leader rejoin with unknown id will not trigger rebalance, and no assignment will be returned.
-    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
 
     checkJoinGroupResult(joinGroupResult,
       Errors.NONE,
@@ -1033,13 +1033,69 @@ class GroupCoordinatorTest {
   }
 
   @Test
-  def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocol(): Unit = {
+  def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged(): Unit = {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
 
-    // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance.
+    // A static follower rejoin with protocol changed and also cause updated group's selectedProtocol changed
+    // should trigger rebalance.
+    val selectedProtocols = getGroup(groupId).selectProtocol
     val newProtocols = List(("roundrobin", metadata))
+    assert(!newProtocols.map(_._1).contains(selectedProtocols))
+    // Old leader hasn't joined in the meantime, triggering a re-election.
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1)
+
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation + 1,
+      Set(leaderInstanceId, followerInstanceId),
+      groupId,
+      CompletingRebalance,
+      Some(protocolType))
+
+    assertTrue(getGroup(groupId).isLeader(joinGroupResult.memberId))
+    assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+    assertEquals(joinGroupResult.protocolName, Some("roundrobin"))
+  }
+
+  @Test
+  def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure(): Unit = {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    val selectedProtocol = getGroup(groupId).selectProtocol
+    val newProtocols = List((selectedProtocol, metadata))
+    // Timeout old leader in the meantime.
+    val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+
+    checkJoinGroupResult(joinGroupResult,
+      Errors.UNKNOWN_SERVER_ERROR,
+      rebalanceResult.generation,
+      Set.empty,
+      groupId,
+      Stable,
+      Some(protocolType))
+
+    EasyMock.reset(replicaManager)
+    // Join with old member id will not fail because the member id is not updated because of persistence failure
+    assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+    val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+    assertEquals(Errors.NONE, oldFollowerJoinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    // Sync with old member id will also not fail because the member id is not updated because of persistence failure
+    val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId, None, None, followerInstanceId)
+    assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
+  }
+
+  @Test
+  def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchanged(): Unit = {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance if updated
+    // group's selectProtocol remain unchanged.
+    val selectedProtocol = getGroup(groupId).selectProtocol
+    val newProtocols = List((selectedProtocol, metadata))
     // Timeout old leader in the meantime.
-    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+    val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
 
     checkJoinGroupResult(joinGroupResult,
       Errors.NONE,
@@ -1122,7 +1178,7 @@ class GroupCoordinatorTest {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
 
     // A static follower rejoin with no protocol change will not trigger rebalance.
-    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
 
     // Old leader shouldn't be timed out.
     assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
@@ -1210,8 +1266,7 @@ class GroupCoordinatorTest {
     var lastMemberId = initialResult.leaderId
     for (_ <- 1 to 5) {
       EasyMock.reset(replicaManager)
-
-      val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
+      val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
         leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance)
       assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId.get))
       assertNotEquals(lastMemberId, joinGroupResult.memberId)
@@ -3789,6 +3844,41 @@ class GroupCoordinatorTest {
                             requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
 
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+      requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+  private def sendStaticJoinGroupWithPersistence(groupId: String,
+                                                 memberId: String,
+                                                 protocolType: String,
+                                                 protocols: List[(String, Array[Byte])],
+                                                 groupInstanceId: Option[String],
+                                                 sessionTimeout: Int,
+                                                 rebalanceTimeout: Int,
+                                                 appendRecordError: Errors,
+                                                 requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
+    val (responseFuture, responseCallback) = setupJoinGroupCallback
+
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+          new PartitionResponse(appendRecordError, 0L, RecordBatch.NO_TIMESTAMP, 0L)
+        )
+      )})
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
@@ -3882,6 +3972,22 @@ class GroupCoordinatorTest {
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+                                             memberId: String,
+                                             groupInstanceId: Option[String],
+                                             protocolType: String,
+                                             protocols: List[(String, Array[Byte])],
+                                             clockAdvance: Int,
+                                             sessionTimeout: Int = DefaultSessionTimeout,
+                                             rebalanceTimeout: Int = DefaultRebalanceTimeout,
+                                             appendRecordError: Errors = Errors.NONE): JoinGroupResult = {
+    val responseFuture = sendStaticJoinGroupWithPersistence(groupId, memberId, protocolType, protocols, groupInstanceId, sessionTimeout, rebalanceTimeout, appendRecordError)
+
+    timer.advanceClock(clockAdvance)
+    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+    Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+  }
+
   private def syncGroupFollower(groupId: String,
                                 generationId: Int,
                                 memberId: String,