You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2019/10/30 04:40:00 UTC

[jira] [Resolved] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

     [ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sophie Blee-Goldman resolved KAFKA-8972.
----------------------------------------
    Resolution: Fixed

Nice work guys!

> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8972
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Blocker
>             Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, and then clear the assignment.
> However, the subscription's assignment is already cleared in {{this.subscriptions.unsubscribe();}} which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)