You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Boyang Chen <bc...@outlook.com> on 2019/06/13 21:36:35 UTC

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hey all,

we decided to push 2 minor changes for better usability for static membership.

  1.  Add `group.instance.id` field to the `DescribeGroupResponse` API. This gives visibility for client user to check which static members are registered on the current group.
  2.  Add getGroupInstanceId() function to `Subscription` class. This helps utilizes static member information to generate more stable resource assignment when the group membership remains the same during rolling bounce.

The KIP is updated with these changes. No compatibility issue is anticipated. Let me know if you have any concerns.

Best,
Boyang

________________________________
From: Boyang Chen <bc...@outlook.com>
Sent: Friday, April 26, 2019 11:16 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hey all,

there is a minor change to the stream side logic for static membership<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>. Originally we chose to piggy-back on user to supply a unique `client.id` config that we could use to construct per thread level consumer `group.instance.id`. This approach has several drawbacks:

  1.  We already have functionalities relying on `client.id`, and it is not always the case where user wants to configure it differently for individual instances. For example, currently user could throttle requests under same client.id, which is a solid use case where the `client.id` should duplicate.
  2.  Existing stream users may unconsciously trigger static membership if they already set `client.id` in their Stream apps. This includes unexpected fatal errors due to `group.instance.id` fencing we are going to introduce.

In conclusion, it is not good practice to overload existing config that users rely on unless there is no side effect. To make more fault tolerant upgrade path, we decide to let stream users choose to set `group.instance.id` if they want to enable static membership.

Thank you Guozhang and Matthias for the great discussions and enhancements for the KIP!

Best,
Boyang


________________________________
From: Boyang Chen <bc...@outlook.com>
Sent: Saturday, March 9, 2019 2:28 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hi Mike,

Yes that's the plan!

________________________________
From: Mike Freyberger <mi...@xandr.com>
Sent: Saturday, March 9, 2019 10:04 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hi Boyang,

Is this work targeted for Kafka 2.3? I am eager to use this new feature.

Thanks,

Mike Freyberger

On 12/21/18, 1:21 PM, "Mayuresh Gharat" <gh...@gmail.com> wrote:

    Hi Boyang,

    Regarding "However, we shall still attempt to remove the member static info
    if the given `member.id` points to an existing `group.instance.id` upon
    LeaveGroupRequest, because I could think of the possibility that in long
    term we could want to add static membership leave group logic for more
    fine-grained use cases."

    > I think, there is some confusion here. I am probably not putting it
    > right.
    >
    I agree, If a static member sends LeaveGroupRequest, it should be removed
    > from the group.
    >
    Now getting back to downgrade of static membership to Dynamic membership,
    > with the example described earlier  (copying it again for ease of reading)
    > :
    >

    >>    1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
    >>    2. The group.instance.id for each of there are as follows :
    >>       - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
    >>    3. The mapping on the GroupCordinator would be :
    >>       - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
    >>       mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
    >>       respectively, by the GroupCoordinator.
    >>    4. Now we do a restart to move the group to dynamic membership.
    >>    5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
    >>    don't persist the previously assigned memberId mc1 anywhere on the c1).
    >>
    > - We agree that there is no way to recognize that c1 was a part of the
    > group, *earlier*.  If yes, the statement : "The dynamic member rejoins
    > the group without `group.instance.id`. It will be accepted since it is a
    > known member." is not necessarily true, right?
    >


    > - Now I *agree* with "However, we shall still attempt to remove the
    > member static info if the given `member.id` points to an existing `
    > group.instance.id` upon LeaveGroupRequest, because I could think of the
    > possibility that in long term we could want to add static membership leave
    > group logic for more fine-grained use cases."
    >
    But that would only happen if the GroupCoordinator allocates the same
    > member.id (mc1) to the consumer c1, when it rejoins the group in step 5
    > above as a dynamic member, which is very rare as it is randomly generated,
    > but possible.
    >


    > - This raises another question, if the GroupCoordinator assigns a
    > member.id (mc1~) to consumer c1 after step 5. It will join the group and
    > rebalance and the group will become stable, eventually. Now the
    > GroupCoordinator still maintains a mapping of  "group.instance.id ->
    > member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
    > after some time, it realizes that it has not received heartbeat from the
    > consumer with "group.instance.id" = gc1. In that case, it will trigger
    > another rebalance assuming that a static member has left the group (when
    > actually it (c1) has not left the group but moved to dynamic membership).
    > This can result in multiple rebalances as the same will happen for c2, c3,
    > c4.
    >

    Thoughts ???
    One thing, I can think of right now is to run :
    removeMemberFromGroup(String groupId, list<String>
    groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options)
    with groupInstanceIdsToRemove = <gc1, gc2, gc3, gc4> once we have bounced
    all the members in the group. This assumes that we will be able to complete
    the bounces before the GroupCoordinator realizes that it has not received a
    heartbeat for any of <gc1, gc2, gc3, gc4>. This is tricky and error prone.
    Will have to think more on this.

    Thanks,

    Mayuresh



Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Posted by Boyang Chen <bc...@outlook.com>.
Hey Mike,

thanks for asking! We have shipped 85% of the features in 345 already, the remaining works are just tooling supports: https://issues.apache.org/jira/browse/KAFKA-7018

you should be able to use it in 2.3 🙂

Boyang

________________________________
From: Mike Freyberger <mi...@xandr.com>
Sent: Friday, June 14, 2019 8:42 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Hi Boyang,

Thanks for the update. To what extent will KIP-345 be available in 2.3.0?

Mike

On 6/13/19, 5:36 PM, "Boyang Chen" <bc...@outlook.com> wrote:

    Hey all,

    we decided to push 2 minor changes for better usability for static membership.

      1.  Add `group.instance.id` field to the `DescribeGroupResponse` API. This gives visibility for client user to check which static members are registered on the current group.
      2.  Add getGroupInstanceId() function to `Subscription` class. This helps utilizes static member information to generate more stable resource assignment when the group membership remains the same during rolling bounce.

    The KIP is updated with these changes. No compatibility issue is anticipated. Let me know if you have any concerns.

    Best,
    Boyang

    ________________________________
    From: Boyang Chen <bc...@outlook.com>
    Sent: Friday, April 26, 2019 11:16 AM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

    Hey all,

    there is a minor change to the stream side logic for static membership<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>. Originally we chose to piggy-back on user to supply a unique `client.id` config that we could use to construct per thread level consumer `group.instance.id`. This approach has several drawbacks:

      1.  We already have functionalities relying on `client.id`, and it is not always the case where user wants to configure it differently for individual instances. For example, currently user could throttle requests under same client.id, which is a solid use case where the `client.id` should duplicate.
      2.  Existing stream users may unconsciously trigger static membership if they already set `client.id` in their Stream apps. This includes unexpected fatal errors due to `group.instance.id` fencing we are going to introduce.

    In conclusion, it is not good practice to overload existing config that users rely on unless there is no side effect. To make more fault tolerant upgrade path, we decide to let stream users choose to set `group.instance.id` if they want to enable static membership.

    Thank you Guozhang and Matthias for the great discussions and enhancements for the KIP!

    Best,
    Boyang


    ________________________________
    From: Boyang Chen <bc...@outlook.com>
    Sent: Saturday, March 9, 2019 2:28 PM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

    Hi Mike,

    Yes that's the plan!

    ________________________________
    From: Mike Freyberger <mi...@xandr.com>
    Sent: Saturday, March 9, 2019 10:04 AM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

    Hi Boyang,

    Is this work targeted for Kafka 2.3? I am eager to use this new feature.

    Thanks,

    Mike Freyberger

    On 12/21/18, 1:21 PM, "Mayuresh Gharat" <gh...@gmail.com> wrote:

        Hi Boyang,

        Regarding "However, we shall still attempt to remove the member static info
        if the given `member.id` points to an existing `group.instance.id` upon
        LeaveGroupRequest, because I could think of the possibility that in long
        term we could want to add static membership leave group logic for more
        fine-grained use cases."

        > I think, there is some confusion here. I am probably not putting it
        > right.
        >
        I agree, If a static member sends LeaveGroupRequest, it should be removed
        > from the group.
        >
        Now getting back to downgrade of static membership to Dynamic membership,
        > with the example described earlier  (copying it again for ease of reading)
        > :
        >

        >>    1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
        >>    2. The group.instance.id for each of there are as follows :
        >>       - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
        >>    3. The mapping on the GroupCordinator would be :
        >>       - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
        >>       mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
        >>       respectively, by the GroupCoordinator.
        >>    4. Now we do a restart to move the group to dynamic membership.
        >>    5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
        >>    don't persist the previously assigned memberId mc1 anywhere on the c1).
        >>
        > - We agree that there is no way to recognize that c1 was a part of the
        > group, *earlier*.  If yes, the statement : "The dynamic member rejoins
        > the group without `group.instance.id`. It will be accepted since it is a
        > known member." is not necessarily true, right?
        >


        > - Now I *agree* with "However, we shall still attempt to remove the
        > member static info if the given `member.id` points to an existing `
        > group.instance.id` upon LeaveGroupRequest, because I could think of the
        > possibility that in long term we could want to add static membership leave
        > group logic for more fine-grained use cases."
        >
        But that would only happen if the GroupCoordinator allocates the same
        > member.id (mc1) to the consumer c1, when it rejoins the group in step 5
        > above as a dynamic member, which is very rare as it is randomly generated,
        > but possible.
        >


        > - This raises another question, if the GroupCoordinator assigns a
        > member.id (mc1~) to consumer c1 after step 5. It will join the group and
        > rebalance and the group will become stable, eventually. Now the
        > GroupCoordinator still maintains a mapping of  "group.instance.id ->
        > member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
        > after some time, it realizes that it has not received heartbeat from the
        > consumer with "group.instance.id" = gc1. In that case, it will trigger
        > another rebalance assuming that a static member has left the group (when
        > actually it (c1) has not left the group but moved to dynamic membership).
        > This can result in multiple rebalances as the same will happen for c2, c3,
        > c4.
        >

        Thoughts ???
        One thing, I can think of right now is to run :
        removeMemberFromGroup(String groupId, list<String>
        groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options)
        with groupInstanceIdsToRemove = <gc1, gc2, gc3, gc4> once we have bounced
        all the members in the group. This assumes that we will be able to complete
        the bounces before the GroupCoordinator realizes that it has not received a
        heartbeat for any of <gc1, gc2, gc3, gc4>. This is tricky and error prone.
        Will have to think more on this.

        Thanks,

        Mayuresh





Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

Posted by Mike Freyberger <mi...@xandr.com>.
Hi Boyang,

Thanks for the update. To what extent will KIP-345 be available in 2.3.0?

Mike

On 6/13/19, 5:36 PM, "Boyang Chen" <bc...@outlook.com> wrote:

    Hey all,
    
    we decided to push 2 minor changes for better usability for static membership.
    
      1.  Add `group.instance.id` field to the `DescribeGroupResponse` API. This gives visibility for client user to check which static members are registered on the current group.
      2.  Add getGroupInstanceId() function to `Subscription` class. This helps utilizes static member information to generate more stable resource assignment when the group membership remains the same during rolling bounce.
    
    The KIP is updated with these changes. No compatibility issue is anticipated. Let me know if you have any concerns.
    
    Best,
    Boyang
    
    ________________________________
    From: Boyang Chen <bc...@outlook.com>
    Sent: Friday, April 26, 2019 11:16 AM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id
    
    Hey all,
    
    there is a minor change to the stream side logic for static membership<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>. Originally we chose to piggy-back on user to supply a unique `client.id` config that we could use to construct per thread level consumer `group.instance.id`. This approach has several drawbacks:
    
      1.  We already have functionalities relying on `client.id`, and it is not always the case where user wants to configure it differently for individual instances. For example, currently user could throttle requests under same client.id, which is a solid use case where the `client.id` should duplicate.
      2.  Existing stream users may unconsciously trigger static membership if they already set `client.id` in their Stream apps. This includes unexpected fatal errors due to `group.instance.id` fencing we are going to introduce.
    
    In conclusion, it is not good practice to overload existing config that users rely on unless there is no side effect. To make more fault tolerant upgrade path, we decide to let stream users choose to set `group.instance.id` if they want to enable static membership.
    
    Thank you Guozhang and Matthias for the great discussions and enhancements for the KIP!
    
    Best,
    Boyang
    
    
    ________________________________
    From: Boyang Chen <bc...@outlook.com>
    Sent: Saturday, March 9, 2019 2:28 PM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id
    
    Hi Mike,
    
    Yes that's the plan!
    
    ________________________________
    From: Mike Freyberger <mi...@xandr.com>
    Sent: Saturday, March 9, 2019 10:04 AM
    To: dev@kafka.apache.org
    Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id
    
    Hi Boyang,
    
    Is this work targeted for Kafka 2.3? I am eager to use this new feature.
    
    Thanks,
    
    Mike Freyberger
    
    On 12/21/18, 1:21 PM, "Mayuresh Gharat" <gh...@gmail.com> wrote:
    
        Hi Boyang,
    
        Regarding "However, we shall still attempt to remove the member static info
        if the given `member.id` points to an existing `group.instance.id` upon
        LeaveGroupRequest, because I could think of the possibility that in long
        term we could want to add static membership leave group logic for more
        fine-grained use cases."
    
        > I think, there is some confusion here. I am probably not putting it
        > right.
        >
        I agree, If a static member sends LeaveGroupRequest, it should be removed
        > from the group.
        >
        Now getting back to downgrade of static membership to Dynamic membership,
        > with the example described earlier  (copying it again for ease of reading)
        > :
        >
    
        >>    1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
        >>    2. The group.instance.id for each of there are as follows :
        >>       - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
        >>    3. The mapping on the GroupCordinator would be :
        >>       - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
        >>       mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
        >>       respectively, by the GroupCoordinator.
        >>    4. Now we do a restart to move the group to dynamic membership.
        >>    5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
        >>    don't persist the previously assigned memberId mc1 anywhere on the c1).
        >>
        > - We agree that there is no way to recognize that c1 was a part of the
        > group, *earlier*.  If yes, the statement : "The dynamic member rejoins
        > the group without `group.instance.id`. It will be accepted since it is a
        > known member." is not necessarily true, right?
        >
    
    
        > - Now I *agree* with "However, we shall still attempt to remove the
        > member static info if the given `member.id` points to an existing `
        > group.instance.id` upon LeaveGroupRequest, because I could think of the
        > possibility that in long term we could want to add static membership leave
        > group logic for more fine-grained use cases."
        >
        But that would only happen if the GroupCoordinator allocates the same
        > member.id (mc1) to the consumer c1, when it rejoins the group in step 5
        > above as a dynamic member, which is very rare as it is randomly generated,
        > but possible.
        >
    
    
        > - This raises another question, if the GroupCoordinator assigns a
        > member.id (mc1~) to consumer c1 after step 5. It will join the group and
        > rebalance and the group will become stable, eventually. Now the
        > GroupCoordinator still maintains a mapping of  "group.instance.id ->
        > member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
        > after some time, it realizes that it has not received heartbeat from the
        > consumer with "group.instance.id" = gc1. In that case, it will trigger
        > another rebalance assuming that a static member has left the group (when
        > actually it (c1) has not left the group but moved to dynamic membership).
        > This can result in multiple rebalances as the same will happen for c2, c3,
        > c4.
        >
    
        Thoughts ???
        One thing, I can think of right now is to run :
        removeMemberFromGroup(String groupId, list<String>
        groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options)
        with groupInstanceIdsToRemove = <gc1, gc2, gc3, gc4> once we have bounced
        all the members in the group. This assumes that we will be able to complete
        the bounces before the GroupCoordinator realizes that it has not received a
        heartbeat for any of <gc1, gc2, gc3, gc4>. This is tricky and error prone.
        Will have to think more on this.
    
        Thanks,
    
        Mayuresh