You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jose Santiago Silva Ramirez (JIRA)" <ji...@apache.org> on 2018/11/03 20:37:00 UTC

[jira] [Commented] (KAFKA-7018) persist memberId for consumer restart

    [ https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674195#comment-16674195 ] 

Jose Santiago Silva Ramirez commented on KAFKA-7018:
----------------------------------------------------

[~shagito79] 

> persist memberId for consumer restart
> -------------------------------------
>
>                 Key: KAFKA-7018
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
>     // if the member id is unknown, register the member to the group
>     addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
>   } else {
>     val member = group.get(memberId)
>     if (group.isLeader(memberId) || !member.matches(protocols)) {
>       // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
>       // The latter allows the leader to trigger rebalances for changes affecting assignment
>       // which do not affect the member metadata (such as topic metadata changes for the consumer)
>       updateMemberAndRebalance(group, member, protocols, responseCallback)
>     } else {
>       // for followers with no actual change to their metadata, just return group information
>       // for the current generation which will allow them to issue SyncGroup
>       responseCallback(JoinGroupResult(
>         members = Map.empty,
>         memberId = memberId,
>         generationId = group.generationId,
>         subProtocol = group.protocolOrNull,
>         leaderId = group.leaderOrNull,
>         error = Errors.NONE))
>     }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the first join group request. This means we will treat the restarted consumer as a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We currently only need this feature in stream application, but will do no harm if we also use it for consumer in general. This would be a nice-to-have feature on consumer restart when we configured the loading-previous-memberId to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is very risky. So far client side change should be the least effort. The end goal is to avoid excessive rebalance from the same consumer restart, so if you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)