You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "zou shengfu (Jira)" <ji...@apache.org> on 2023/03/22 08:47:00 UTC

[jira] [Created] (KAFKA-14832) Thread unsafe for GroupMetadata

zou shengfu created KAFKA-14832:
-----------------------------------

             Summary: Thread unsafe for GroupMetadata
                 Key: KAFKA-14832
                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 3.3.2
            Reporter: zou shengfu
            Assignee: zou shengfu


          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")

              // Failed to persist member.id of the given static member, revert the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members are able to
              // skip running the assignor based on the `SkipAssignment` field. We leverage
              // this to tell the leader that it is the leader of the group but by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata changes for the
              // group. Note that any metadata changes happening while the static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side effects. This is why
              // we skip entirely the assignment step as it could generate a different group
              // assignment which would be ignored by the group coordinator because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else {
              // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader
              // performing trivial assignment while the group is in stable stage, because
              // the new assignment in leader's next sync call won't be broadcast by a stable group.
              // This could be guaranteed by always returning the old leader id so that the current
              // leader won't assume itself as a leader based on the returned message, since the new
              // member.id won't match returned leader id, therefore no assignment will be performed.
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = List.empty,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = Errors.NONE
              ))
            }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)