You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Boyang Chen (JIRA)" <ji...@apache.org> on 2018/06/07 18:39:00 UTC

[jira] [Created] (KAFKA-7018) persist memberId for consumer restart

Boyang Chen created KAFKA-7018:
----------------------------------

             Summary: persist memberId for consumer restart
                 Key: KAFKA-7018
                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
             Project: Kafka
          Issue Type: Improvement
          Components: consumer, streams
            Reporter: Boyang Chen
            Assignee: Boyang Chen


In group coordinator, there is a logic to neglect join group request from existing follower consumers:
{code:java}
case Empty | Stable =>
  if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
    // if the member id is unknown, register the member to the group
    addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
  } else {
    val member = group.get(memberId)
    if (group.isLeader(memberId) || !member.matches(protocols)) {
      // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
      // The latter allows the leader to trigger rebalances for changes affecting assignment
      // which do not affect the member metadata (such as topic metadata changes for the consumer)
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    } else {
      // for followers with no actual change to their metadata, just return group information
      // for the current generation which will allow them to issue SyncGroup
      responseCallback(JoinGroupResult(
        members = Map.empty,
        memberId = memberId,
        generationId = group.generationId,
        subProtocol = group.protocolOrNull,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    }
{code}
While looking at the AbstractCoordinator, I found that the generation was hard-coded as 

NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the first join group request. This means we will treat the restarted consumer as a new member, so the rebalance will be triggered until session timeout.

I'm trying to clarify the following things before we extend the discussion:
 # Whether my understanding of the above logic is right (Hope [~mjsax] could help me double check)
 # Whether it makes sense to persist last round of memberId for consumers? We currently only need this feature in stream application, but will do no harm if we also use it for consumer in general. This would be a nice-to-have feature on consumer restart when we configured the loading-previous-memberId to true. If we failed, simply use the UNKNOWN_MEMBER_ID
 # The behavior could also be changed on the broker side, but I suspect it is very risky. So far client side change should be the least effort. The end goal is to avoid excessive rebalance from the same consumer restart, so if you feel server side change could also help, we could further discuss.

Thank you for helping out! [~mjsax] [~guozhang]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)