You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "David Jacot (Jira)" <ji...@apache.org> on 2022/02/14 10:57:00 UTC
[jira] [Updated] (KAFKA-13435) Static membership protocol should let the leader skip assignment (KIP-814)
[ https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot updated KAFKA-13435:
--------------------------------
Summary: Static membership protocol should let the leader skip assignment (KIP-814) (was: Group won't consume partitions added after static member restart)
> Static membership protocol should let the leader skip assignment (KIP-814)
> --------------------------------------------------------------------------
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.0.0
> Reporter: Ryan Leslie
> Assignee: David Jacot
> Priority: Critical
> Labels: new-rebalance-should-fix
> Fix For: 3.2.0
>
>
> When using consumer groups with static membership, if the consumer marked as leader has restarted, then metadata changes such as partition increase are not triggering expected rebalances.
> To reproduce this issue, simply:
> # Create a static consumer subscribed to a single topic
> # Close the consumer and create a new one with the same group instance id
> # Increase partitions for the topic
> # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to track metadata and trigger a rebalance if there are changes such as new partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group should haveĀ _isLeader == True_ and be responsible for triggering rebalances on metadata changes.
> However, in the case of static membership, if the leader has been restarted and rejoined the group, the group essentially no longer has a current leader. Even though the metadata changes are fetched, no rebalance will be triggered. That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a proper rebalance. In order to safely make a partition increase when using static membership, consumers must be stopped and have timed out, or forcibly removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. Currently, when a static consumer that is leader is restarted, the coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with unknown member id rejoins, assigning new member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new rebalance, and JOIN_GROUP will continue returning the now stale member id as leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, protocolType='consumer', protocolName='range', leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', members=[]){noformat}
> This means that it's not easy for any particular restarted member to identify that it should consider itself leader and handle metadata changes.
> There is reference to the difficulty of leader restarts in KAFKA-7728 but the focus seemed mainly on avoiding needless rebalances for static members. That goal was accomplished, but this issue seems to be a side effect of both not rebalancing AND not having the rejoined member reclaim its leadership status.
> Also, I have not verified if it's strictly related or valid, but noticed this ticket has been opened too: KAFKA-12759.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)