You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "zou shengfu (Jira)" <ji...@apache.org> on 2023/03/22 08:47:00 UTC
[jira] [Created] (KAFKA-14832) Thread unsafe for GroupMetadata
zou shengfu created KAFKA-14832:
-----------------------------------
Summary: Thread unsafe for GroupMetadata
Key: KAFKA-14832
URL: https://issues.apache.org/jira/browse/KAFKA-14832
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu
groupManager.storeGroup(group, groupAssignment, error => {
if (error != Errors.NONE) {
warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
// Failed to persist member.id of the given static member, revert the update of the static member in the group.
group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
List.empty,
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = error
))
} else if (supportSkippingAssignment) {
// Starting from version 9 of the JoinGroup API, static members are able to
// skip running the assignor based on the `SkipAssignment` field. We leverage
// this to tell the leader that it is the leader of the group but by skipping
// running the assignor while the group is in stable state.
// Notes:
// 1) This allows the leader to continue monitoring metadata changes for the
// group. Note that any metadata changes happening while the static leader is
// down won't be noticed.
// 2) The assignors are not idempotent nor free from side effects. This is why
// we skip entirely the assignment step as it could generate a different group
// assignment which would be ignored by the group coordinator because the group
// is the stable state.
val isLeader = group.isLeader(newMemberId)
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = if (isLeader) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
skipAssignment = isLeader,
error = Errors.NONE
))
} else {
// Prior to version 9 of the JoinGroup API, we wanted to avoid current leader
// performing trivial assignment while the group is in stable stage, because
// the new assignment in leader's next sync call won't be broadcast by a stable group.
// This could be guaranteed by always returning the old leader id so that the current
// leader won't assume itself as a leader based on the returned message, since the new
// member.id won't match returned leader id, therefore no assignment will be performed.
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = List.empty,
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = Errors.NONE
))
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)