You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 14:56:45 UTC
[kafka] branch trunk updated: KAFKA-10284: Group membership update
due to static member rejoin should be persisted (#9270)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ca299b KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)
7ca299b is described below
commit 7ca299b8c0f2f3256c40b694078e422350c20d19
Author: feyman2016 <fe...@aliyun.com>
AuthorDate: Thu Oct 22 22:55:52 2020 +0800
KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)
Reviewers: Boyang Chen <bo...@apache.org>, John Roesler <vv...@apache.org>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 59 +++++++---
.../coordinator/group/GroupCoordinatorTest.scala | 120 +++++++++++++++++++--
2 files changed, 157 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index c407368..cd8a414 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1045,24 +1045,53 @@ class GroupCoordinator(val brokerId: Int,
val knownStaticMember = group.get(newMemberId)
group.updateMember(knownStaticMember, protocols, responseCallback)
+ val oldProtocols = knownStaticMember.supportedProtocols
group.currentState match {
case Stable =>
- info(s"Static member joins during Stable stage will not trigger rebalance.")
- group.maybeInvokeJoinCallback(member, JoinGroupResult(
- members = List.empty,
- memberId = newMemberId,
- generationId = group.generationId,
- protocolType = group.protocolType,
- protocolName = group.protocolName,
- // We want to avoid current leader performing trivial assignment while the group
- // is in stable stage, because the new assignment in leader's next sync call
- // won't be broadcast by a stable group. This could be guaranteed by
- // always returning the old leader id so that the current leader won't assume itself
- // as a leader based on the returned message, since the new member.id won't match
- // returned leader id, therefore no assignment will be performed.
- leaderId = currentLeader,
- error = Errors.NONE))
+ // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
+ // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
+ val selectedProtocolOfNextGeneration = group.selectProtocol
+ if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+ info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
+ val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
+ groupManager.storeGroup(group, groupAssignment, error => {
+ if (error != Errors.NONE) {
+ warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
+
+ // Failed to persist member.id of the given static member, revert the update of the static member in the group.
+ group.updateMember(knownStaticMember, oldProtocols, null)
+ val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
+ completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+ responseCallback(JoinGroupResult(
+ List.empty,
+ memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ leaderId = currentLeader,
+ error = error
+ ))
+ } else {
+ group.maybeInvokeJoinCallback(member, JoinGroupResult(
+ members = List.empty,
+ memberId = newMemberId,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ // We want to avoid current leader performing trivial assignment while the group
+ // is in stable stage, because the new assignment in leader's next sync call
+ // won't be broadcast by a stable group. This could be guaranteed by
+ // always returning the old leader id so that the current leader won't assume itself
+ // as a leader based on the returned message, since the new member.id won't match
+ // returned leader id, therefore no assignment will be performed.
+ leaderId = currentLeader,
+ error = Errors.NONE))
+ }
+ })
+ } else {
+ maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")
+ }
case CompletingRebalance =>
// if the group is in after-sync stage, upon getting a new join-group of a known static member
// we should still trigger a new rebalance, since the old member may already be sent to the leader
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index ee72faa..95085da 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -941,7 +941,7 @@ class GroupCoordinatorTest {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
// A static leader rejoin with unknown id will not trigger rebalance, and no assignment will be returned.
- val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
checkJoinGroupResult(joinGroupResult,
Errors.NONE,
@@ -1033,13 +1033,69 @@ class GroupCoordinatorTest {
}
@Test
- def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocol(): Unit = {
+ def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged(): Unit = {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
- // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance.
+ // A static follower rejoin with protocol changed and also cause updated group's selectedProtocol changed
+ // should trigger rebalance.
+ val selectedProtocols = getGroup(groupId).selectProtocol
val newProtocols = List(("roundrobin", metadata))
+ assert(!newProtocols.map(_._1).contains(selectedProtocols))
+ // Old leader hasn't joined in the meantime, triggering a re-election.
+ val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1)
+
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation + 1,
+ Set(leaderInstanceId, followerInstanceId),
+ groupId,
+ CompletingRebalance,
+ Some(protocolType))
+
+ assertTrue(getGroup(groupId).isLeader(joinGroupResult.memberId))
+ assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+ assertEquals(joinGroupResult.protocolName, Some("roundrobin"))
+ }
+
+ @Test
+ def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure(): Unit = {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ val selectedProtocol = getGroup(groupId).selectProtocol
+ val newProtocols = List((selectedProtocol, metadata))
+ // Timeout old leader in the meantime.
+ val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+
+ checkJoinGroupResult(joinGroupResult,
+ Errors.UNKNOWN_SERVER_ERROR,
+ rebalanceResult.generation,
+ Set.empty,
+ groupId,
+ Stable,
+ Some(protocolType))
+
+ EasyMock.reset(replicaManager)
+ // Join with old member id will not fail because the member id is not updated because of persistence failure
+ assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+ val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+ assertEquals(Errors.NONE, oldFollowerJoinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ // Sync with old member id will also not fail because the member id is not updated because of persistence failure
+ val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId, None, None, followerInstanceId)
+ assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
+ }
+
+ @Test
+ def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchanged(): Unit = {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance if updated
+ // group's selectProtocol remain unchanged.
+ val selectedProtocol = getGroup(groupId).selectProtocol
+ val newProtocols = List((selectedProtocol, metadata))
// Timeout old leader in the meantime.
- val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+ val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
checkJoinGroupResult(joinGroupResult,
Errors.NONE,
@@ -1122,7 +1178,7 @@ class GroupCoordinatorTest {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
// A static follower rejoin with no protocol change will not trigger rebalance.
- val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
// Old leader shouldn't be timed out.
assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
@@ -1210,8 +1266,7 @@ class GroupCoordinatorTest {
var lastMemberId = initialResult.leaderId
for (_ <- 1 to 5) {
EasyMock.reset(replicaManager)
-
- val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance)
assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId.get))
assertNotEquals(lastMemberId, joinGroupResult.memberId)
@@ -3789,6 +3844,41 @@ class GroupCoordinatorTest {
requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
val (responseFuture, responseCallback) = setupJoinGroupCallback
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+ EasyMock.replay(replicaManager)
+
+ groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+ requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
+ responseFuture
+ }
+
+ private def sendStaticJoinGroupWithPersistence(groupId: String,
+ memberId: String,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ groupInstanceId: Option[String],
+ sessionTimeout: Int,
+ rebalanceTimeout: Int,
+ appendRecordError: Errors,
+ requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
+ val (responseFuture, responseCallback) = setupJoinGroupCallback
+
+ val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+ EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+ EasyMock.anyShort(),
+ internalTopicsAllowed = EasyMock.eq(true),
+ origin = EasyMock.eq(AppendOrigin.Coordinator),
+ EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+ EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+ override def answer = capturedArgument.getValue.apply(
+ Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+ new PartitionResponse(appendRecordError, 0L, RecordBatch.NO_TIMESTAMP, 0L)
+ )
+ )})
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
@@ -3882,6 +3972,22 @@ class GroupCoordinatorTest {
Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
}
+ private def staticJoinGroupWithPersistence(groupId: String,
+ memberId: String,
+ groupInstanceId: Option[String],
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ clockAdvance: Int,
+ sessionTimeout: Int = DefaultSessionTimeout,
+ rebalanceTimeout: Int = DefaultRebalanceTimeout,
+ appendRecordError: Errors = Errors.NONE): JoinGroupResult = {
+ val responseFuture = sendStaticJoinGroupWithPersistence(groupId, memberId, protocolType, protocols, groupInstanceId, sessionTimeout, rebalanceTimeout, appendRecordError)
+
+ timer.advanceClock(clockAdvance)
+ // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+ Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+ }
+
private def syncGroupFollower(groupId: String,
generationId: Int,
memberId: String,