You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/10/24 00:20:00 UTC

[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

    [ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958425#comment-16958425 ] 

ASF GitHub Bot commented on KAFKA-8972:
---------------------------------------

ableegoldman commented on pull request #7589: KAFKA-8972: need to flush state even on unclean close
URL: https://github.com/apache/kafka/pull/7589
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8972
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Blocker
>             Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, and then clear the assignment.
> However, the subscription's assignment is already cleared in {{this.subscriptions.unsubscribe();}} which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)