You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2022/02/11 07:24:00 UTC

[jira] [Updated] (KAFKA-13435) Group won't consume partitions added after static member restart

     [ https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-13435:
----------------------------------
    Labels: new-rebalance-should-fix  (was: )

> Group won't consume partitions added after static member restart
> ----------------------------------------------------------------
>
>                 Key: KAFKA-13435
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13435
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.0.0
>            Reporter: Ryan Leslie
>            Assignee: David Jacot
>            Priority: Critical
>              Labels: new-rebalance-should-fix
>
> When using consumer groups with static membership, if the consumer marked as leader has restarted, then metadata changes such as partition increase are not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to track metadata and trigger a rebalance if there are changes such as new partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
>     ...
>     requestRejoinIfNecessary(reason);
>     return true;
> }
> {code}
> Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
> if (!isLeader)
>     assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group should haveĀ _isLeader == True_ and be responsible for triggering rebalances on metadata changes.
> However, in the case of static membership, if the leader has been restarted and rejoined the group, the group essentially no longer has a current leader. Even though the metadata changes are fetched, no rebalance will be triggered. That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a proper rebalance. In order to safely make a partition increase when using static membership, consumers must be stopped and have timed out, or forcibly removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. Currently, when a static consumer that is leader is restarted, the coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with unknown member id rejoins, assigning new member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new rebalance, and JOIN_GROUP will continue returning the now stale member id as leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, protocolType='consumer', protocolName='range', leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', members=[]){noformat}
> This means that it's not easy for any particular restarted member to identify that it should consider itself leader and handle metadata changes.
> There is reference to the difficulty of leader restarts in KAFKA-7728 but the focus seemed mainly on avoiding needless rebalances for static members. That goal was accomplished, but this issue seems to be a side effect of both not rebalancing AND not having the rejoined member reclaim its leadership status.
> Also, I have not verified if it's strictly related or valid, but noticed this ticket has been opened too: KAFKA-12759.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)