You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Boyang Chen <bc...@outlook.com> on 2019/05/15 23:32:22 UTC

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hey friends,

we decide to introduce another protocol change to include `group.instance.id` into Sync/Heartbeat/OffsetCommit protocols. The reason is because we found that to be the most safe and consistent approach to fence duplicate consumer clients. More details in updated KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

Let me know if you have any concerns.

Best,
Boyang

________________________________
From: Boyang Chen <bc...@outlook.com>
Sent: Friday, April 26, 2019 11:16 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hey all,

there is a minor change to the stream side logic for static membership<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>. Originally we chose to piggy-back on user to supply a unique `client.id` config that we could use to construct per thread level consumer `group.instance.id`. This approach has several drawbacks:

  1.  We already have functionalities relying on `client.id`, and it is not always the case where user wants to configure it differently for individual instances. For example, currently user could throttle requests under same client.id, which is a solid use case where the `client.id` should duplicate.
  2.  Existing stream users may unconsciously trigger static membership if they already set `client.id` in their Stream apps. This includes unexpected fatal errors due to `group.instance.id` fencing we are going to introduce.

In conclusion, it is not good practice to overload existing config that users rely on unless there is no side effect. To make more fault tolerant upgrade path, we decide to let stream users choose to set `group.instance.id` if they want to enable static membership.

Thank you Guozhang and Matthias for the great discussions and enhancements for the KIP!

Best,
Boyang


________________________________
From: Boyang Chen <bc...@outlook.com>
Sent: Saturday, March 9, 2019 2:28 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hi Mike,

Yes that's the plan!

________________________________
From: Mike Freyberger <mi...@xandr.com>
Sent: Saturday, March 9, 2019 10:04 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hi Boyang,

Is this work targeted for Kafka 2.3? I am eager to use this new feature.

Thanks,

Mike Freyberger

On 12/21/18, 1:21 PM, "Mayuresh Gharat" <gh...@gmail.com> wrote:

    Hi Boyang,

    Regarding "However, we shall still attempt to remove the member static info
    if the given `member.id` points to an existing `group.instance.id` upon
    LeaveGroupRequest, because I could think of the possibility that in long
    term we could want to add static membership leave group logic for more
    fine-grained use cases."

    > I think, there is some confusion here. I am probably not putting it
    > right.
    >
    I agree, If a static member sends LeaveGroupRequest, it should be removed
    > from the group.
    >
    Now getting back to downgrade of static membership to Dynamic membership,
    > with the example described earlier  (copying it again for ease of reading)
    > :
    >

    >>    1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
    >>    2. The group.instance.id for each of there are as follows :
    >>       - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
    >>    3. The mapping on the GroupCordinator would be :
    >>       - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
    >>       mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
    >>       respectively, by the GroupCoordinator.
    >>    4. Now we do a restart to move the group to dynamic membership.
    >>    5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
    >>    don't persist the previously assigned memberId mc1 anywhere on the c1).
    >>
    > - We agree that there is no way to recognize that c1 was a part of the
    > group, *earlier*.  If yes, the statement : "The dynamic member rejoins
    > the group without `group.instance.id`. It will be accepted since it is a
    > known member." is not necessarily true, right?
    >


    > - Now I *agree* with "However, we shall still attempt to remove the
    > member static info if the given `member.id` points to an existing `
    > group.instance.id` upon LeaveGroupRequest, because I could think of the
    > possibility that in long term we could want to add static membership leave
    > group logic for more fine-grained use cases."
    >
    But that would only happen if the GroupCoordinator allocates the same
    > member.id (mc1) to the consumer c1, when it rejoins the group in step 5
    > above as a dynamic member, which is very rare as it is randomly generated,
    > but possible.
    >


    > - This raises another question, if the GroupCoordinator assigns a
    > member.id (mc1~) to consumer c1 after step 5. It will join the group and
    > rebalance and the group will become stable, eventually. Now the
    > GroupCoordinator still maintains a mapping of  "group.instance.id ->
    > member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
    > after some time, it realizes that it has not received heartbeat from the
    > consumer with "group.instance.id" = gc1. In that case, it will trigger
    > another rebalance assuming that a static member has left the group (when
    > actually it (c1) has not left the group but moved to dynamic membership).
    > This can result in multiple rebalances as the same will happen for c2, c3,
    > c4.
    >

    Thoughts ???
    One thing, I can think of right now is to run :
    removeMemberFromGroup(String groupId, list<String>
    groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options)
    with groupInstanceIdsToRemove = <gc1, gc2, gc3, gc4> once we have bounced
    all the members in the group. This assumes that we will be able to complete
    the bounces before the GroupCoordinator realizes that it has not received a
    heartbeat for any of <gc1, gc2, gc3, gc4>. This is tricky and error prone.
    Will have to think more on this.

    Thanks,

    Mayuresh