You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "McCaig, Rhys" <Rh...@comcast.com> on 2018/10/04 19:16:17 UTC

Re: [EXTERNAL] Incremental Cooperative Rebalancing

This is fantastic. Im really excited to see the work on this. 

> On Oct 2, 2018, at 4:22 PM, Konstantine Karantasis <ko...@confluent.io> wrote:
> 
> Hey everyone,
> 
> I'd like to bring to your attention a general design document that was just
> published in Apache Kafka's wiki space:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies
> 
> It deals with the subject of Rebalancing of groups in Kafka and proposes
> basic infrastructure to support improvements on the current rebalancing
> protocol as well as a set of policies that can be implemented to optimize
> rebalancing under a number of real-world scenarios.
> 
> Currently, this wiki page is meant to serve as a reference to the
> proposition of Incremental Cooperative Rebalancing overall. Specific KIPs
> will follow in order to describe in more detail - using the standard KIP
> format - the basic infrastructure and the first policies that will be
> proposed for implementation in components such as Connect, the Kafka
> Consumer and Streams.
> 
> Stay tuned!
> Konstantine


Re: [EXTERNAL] Incremental Cooperative Rebalancing

Posted by Ryanne Dolan <ry...@gmail.com>.
Konstantine, thanks for the explanation, makes sense.

Ryanne

On Tue, Oct 16, 2018, 1:51 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Matthias, Ryanne, Rhys, Guozhang, thank you all for your comments!
>
> Ryanne, to try to address your specific comments, let me start by saying
> that a key concept behind this proposal is the concept of overlapping
> 'communication' with 'computation', which is known for often reducing the
> overall cost (or latency if you prefer) of an operation that involves
> multiple processes compared to global barrier-type synchronization. Of
> course this does not rule out that there might be occasions where
> stop-the-world might incur smaller overall cost. But given that we'll
> always want to minimize communication and shuffling of resources and apply
> more sticky heuristics in assignments, I believe that such edge cases will
> be considerably fewer in practice.
>
> Specifically here, the goal is to exchange (by revoking and reassigning)
> only the necessary resources in the group and allow for the unaffected
> resources to continue being used. As mentioned in the motivation section,
> this is expected to have a positive effect in a number of use cases, for
> which stop-the-world is too strict. Given this key distinction between
> affected and unaffected resources (e.g. topic partitions, tasks in Kafka
> Connect etc) I anticipate that in most cases, even for resources that need
> to change hands, the overall rebalance phase will be faster (especially at
> larger scale) than it is today with all the processes participating in
> resource hand-off and re-assignment.
>
> -Konstantine
>
>
> On Fri, Oct 5, 2018 at 12:00 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Konstantine,
> >
> > Thanks for the great write-up! Here are a few quick comments I have about
> > the proposals:
> >
> > 1. For "Kubernetes process death" and "Rolling bounce" case, there is
> > another parallel work on KIP-345 [1] (cc'ed contributor) that is aimed to
> > mitigate these two issues, but it is relying on the fact that we can
> > disable sending "leave group" request immediately on shutting down.
> Ideally
> > if KIP-345 works well for these cases, then Simple Cooperative
> Rebalancing
> > itself along with KIP-345 should cover most of the scenarios we've
> > described in the wiki. In addition, Delayed / Incremental Imbalance
> > approach can be done incrementally on top of Simple approach, so
> execution
> > wise I'd suggest we start with the Simple approach and observe how well
> it
> > works in practice (especially with K8s etc frameworks) before deciding if
> > we should go further and implemented the more complicated ones.
> >
> > 2. For the "events" section, I think it may worth mentioning if there are
> > any new client / coordinator failure events that need to be addressed
> with
> > the new protocol, as we listed in the original design [2] [3]. For
> example,
> > what if the leader received different client or resource listings during
> > two consecutive rebalances?
> >
> > 3. It's worth mentioning what are the key ideas in the updated protocol:
> >
> > 3.a) In the original protocol we require every member to revoke every
> > resource before joining the group, which can then be used as the
> > "synchronization barrier" and hence it does not matter for clients to
> > receive assignment at different point in time; in the new protocol we do
> > not require members to revoke everything, but instead leveraging on the
> > leader who has the "global picture" to make sure that there are no
> > conflicts between those shared resources, a.k.a as the synchronization
> > barrier.
> > 3.b) The new fields in the Assigned / RevokedPartitions fields in the
> > responses are now "deltas" instead of "overwrites" to the consumers. Any
> > modules relying on it, e.g. Streams who relies on ConsumerCoordinator,
> > needs to adjust their code (PartitionAssignor) correspondingly to
> > incorporate this semantic changes.
> >
> > 4. I've added a child page under yours for illustrating the implications
> > for Streams on rebalance cost reduction [4], since for Streams one key
> > characteristics is that standby tasks exist to help with rebalance
> incurred
> > unavailability, and hence need to be considered upfront how Streams
> should
> > leverage on the new protocol along with standby tasks to achieve the
> better
> > operational goals during rebalances.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Reduce+multiple+consumer+rebalances+by+specifying+member+id
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-CoordinatorStateMachine
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Interestingscenariostoconsider
> > [4]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing+for+Streams
> >
> >
> > On Thu, Oct 4, 2018 at 12:16 PM, McCaig, Rhys <Rh...@comcast.com>
> > wrote:
> >
> > > This is fantastic. Im really excited to see the work on this.
> > >
> > > > On Oct 2, 2018, at 4:22 PM, Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > > >
> > > > Hey everyone,
> > > >
> > > > I'd like to bring to your attention a general design document that
> was
> > > just
> > > > published in Apache Kafka's wiki space:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Incrementa
> > > l+Cooperative+Rebalancing%3A+Support+and+Policies
> > > >
> > > > It deals with the subject of Rebalancing of groups in Kafka and
> > proposes
> > > > basic infrastructure to support improvements on the current
> rebalancing
> > > > protocol as well as a set of policies that can be implemented to
> > optimize
> > > > rebalancing under a number of real-world scenarios.
> > > >
> > > > Currently, this wiki page is meant to serve as a reference to the
> > > > proposition of Incremental Cooperative Rebalancing overall. Specific
> > KIPs
> > > > will follow in order to describe in more detail - using the standard
> > KIP
> > > > format - the basic infrastructure and the first policies that will be
> > > > proposed for implementation in components such as Connect, the Kafka
> > > > Consumer and Streams.
> > > >
> > > > Stay tuned!
> > > > Konstantine
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [EXTERNAL] Incremental Cooperative Rebalancing

Posted by Konstantine Karantasis <ko...@confluent.io>.
Matthias, Ryanne, Rhys, Guozhang, thank you all for your comments!

Ryanne, to try to address your specific comments, let me start by saying
that a key concept behind this proposal is the concept of overlapping
'communication' with 'computation', which is known for often reducing the
overall cost (or latency if you prefer) of an operation that involves
multiple processes compared to global barrier-type synchronization. Of
course this does not rule out that there might be occasions where
stop-the-world might incur smaller overall cost. But given that we'll
always want to minimize communication and shuffling of resources and apply
more sticky heuristics in assignments, I believe that such edge cases will
be considerably fewer in practice.

Specifically here, the goal is to exchange (by revoking and reassigning)
only the necessary resources in the group and allow for the unaffected
resources to continue being used. As mentioned in the motivation section,
this is expected to have a positive effect in a number of use cases, for
which stop-the-world is too strict. Given this key distinction between
affected and unaffected resources (e.g. topic partitions, tasks in Kafka
Connect etc) I anticipate that in most cases, even for resources that need
to change hands, the overall rebalance phase will be faster (especially at
larger scale) than it is today with all the processes participating in
resource hand-off and re-assignment.

-Konstantine


On Fri, Oct 5, 2018 at 12:00 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Konstantine,
>
> Thanks for the great write-up! Here are a few quick comments I have about
> the proposals:
>
> 1. For "Kubernetes process death" and "Rolling bounce" case, there is
> another parallel work on KIP-345 [1] (cc'ed contributor) that is aimed to
> mitigate these two issues, but it is relying on the fact that we can
> disable sending "leave group" request immediately on shutting down. Ideally
> if KIP-345 works well for these cases, then Simple Cooperative Rebalancing
> itself along with KIP-345 should cover most of the scenarios we've
> described in the wiki. In addition, Delayed / Incremental Imbalance
> approach can be done incrementally on top of Simple approach, so execution
> wise I'd suggest we start with the Simple approach and observe how well it
> works in practice (especially with K8s etc frameworks) before deciding if
> we should go further and implemented the more complicated ones.
>
> 2. For the "events" section, I think it may worth mentioning if there are
> any new client / coordinator failure events that need to be addressed with
> the new protocol, as we listed in the original design [2] [3]. For example,
> what if the leader received different client or resource listings during
> two consecutive rebalances?
>
> 3. It's worth mentioning what are the key ideas in the updated protocol:
>
> 3.a) In the original protocol we require every member to revoke every
> resource before joining the group, which can then be used as the
> "synchronization barrier" and hence it does not matter for clients to
> receive assignment at different point in time; in the new protocol we do
> not require members to revoke everything, but instead leveraging on the
> leader who has the "global picture" to make sure that there are no
> conflicts between those shared resources, a.k.a as the synchronization
> barrier.
> 3.b) The new fields in the Assigned / RevokedPartitions fields in the
> responses are now "deltas" instead of "overwrites" to the consumers. Any
> modules relying on it, e.g. Streams who relies on ConsumerCoordinator,
> needs to adjust their code (PartitionAssignor) correspondingly to
> incorporate this semantic changes.
>
> 4. I've added a child page under yours for illustrating the implications
> for Streams on rebalance cost reduction [4], since for Streams one key
> characteristics is that standby tasks exist to help with rebalance incurred
> unavailability, and hence need to be considered upfront how Streams should
> leverage on the new protocol along with standby tasks to achieve the better
> operational goals during rebalances.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Reduce+multiple+consumer+rebalances+by+specifying+member+id
> [2]
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-CoordinatorStateMachine
> [3]
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Interestingscenariostoconsider
> [4]
>
> https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing+for+Streams
>
>
> On Thu, Oct 4, 2018 at 12:16 PM, McCaig, Rhys <Rh...@comcast.com>
> wrote:
>
> > This is fantastic. Im really excited to see the work on this.
> >
> > > On Oct 2, 2018, at 4:22 PM, Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> > >
> > > Hey everyone,
> > >
> > > I'd like to bring to your attention a general design document that was
> > just
> > > published in Apache Kafka's wiki space:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Incrementa
> > l+Cooperative+Rebalancing%3A+Support+and+Policies
> > >
> > > It deals with the subject of Rebalancing of groups in Kafka and
> proposes
> > > basic infrastructure to support improvements on the current rebalancing
> > > protocol as well as a set of policies that can be implemented to
> optimize
> > > rebalancing under a number of real-world scenarios.
> > >
> > > Currently, this wiki page is meant to serve as a reference to the
> > > proposition of Incremental Cooperative Rebalancing overall. Specific
> KIPs
> > > will follow in order to describe in more detail - using the standard
> KIP
> > > format - the basic infrastructure and the first policies that will be
> > > proposed for implementation in components such as Connect, the Kafka
> > > Consumer and Streams.
> > >
> > > Stay tuned!
> > > Konstantine
> >
> >
>
>
> --
> -- Guozhang
>

Re: [EXTERNAL] Incremental Cooperative Rebalancing

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Konstantine,

Thanks for the great write-up! Here are a few quick comments I have about
the proposals:

1. For "Kubernetes process death" and "Rolling bounce" case, there is
another parallel work on KIP-345 [1] (cc'ed contributor) that is aimed to
mitigate these two issues, but it is relying on the fact that we can
disable sending "leave group" request immediately on shutting down. Ideally
if KIP-345 works well for these cases, then Simple Cooperative Rebalancing
itself along with KIP-345 should cover most of the scenarios we've
described in the wiki. In addition, Delayed / Incremental Imbalance
approach can be done incrementally on top of Simple approach, so execution
wise I'd suggest we start with the Simple approach and observe how well it
works in practice (especially with K8s etc frameworks) before deciding if
we should go further and implemented the more complicated ones.

2. For the "events" section, I think it may worth mentioning if there are
any new client / coordinator failure events that need to be addressed with
the new protocol, as we listed in the original design [2] [3]. For example,
what if the leader received different client or resource listings during
two consecutive rebalances?

3. It's worth mentioning what are the key ideas in the updated protocol:

3.a) In the original protocol we require every member to revoke every
resource before joining the group, which can then be used as the
"synchronization barrier" and hence it does not matter for clients to
receive assignment at different point in time; in the new protocol we do
not require members to revoke everything, but instead leveraging on the
leader who has the "global picture" to make sure that there are no
conflicts between those shared resources, a.k.a as the synchronization
barrier.
3.b) The new fields in the Assigned / RevokedPartitions fields in the
responses are now "deltas" instead of "overwrites" to the consumers. Any
modules relying on it, e.g. Streams who relies on ConsumerCoordinator,
needs to adjust their code (PartitionAssignor) correspondingly to
incorporate this semantic changes.

4. I've added a child page under yours for illustrating the implications
for Streams on rebalance cost reduction [4], since for Streams one key
characteristics is that standby tasks exist to help with rebalance incurred
unavailability, and hence need to be considered upfront how Streams should
leverage on the new protocol along with standby tasks to achieve the better
operational goals during rebalances.


[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Reduce+multiple+consumer+rebalances+by+specifying+member+id
[2]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-CoordinatorStateMachine
[3]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Interestingscenariostoconsider
[4]
https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing+for+Streams


On Thu, Oct 4, 2018 at 12:16 PM, McCaig, Rhys <Rh...@comcast.com>
wrote:

> This is fantastic. Im really excited to see the work on this.
>
> > On Oct 2, 2018, at 4:22 PM, Konstantine Karantasis <
> konstantine@confluent.io> wrote:
> >
> > Hey everyone,
> >
> > I'd like to bring to your attention a general design document that was
> just
> > published in Apache Kafka's wiki space:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Incrementa
> l+Cooperative+Rebalancing%3A+Support+and+Policies
> >
> > It deals with the subject of Rebalancing of groups in Kafka and proposes
> > basic infrastructure to support improvements on the current rebalancing
> > protocol as well as a set of policies that can be implemented to optimize
> > rebalancing under a number of real-world scenarios.
> >
> > Currently, this wiki page is meant to serve as a reference to the
> > proposition of Incremental Cooperative Rebalancing overall. Specific KIPs
> > will follow in order to describe in more detail - using the standard KIP
> > format - the basic infrastructure and the first policies that will be
> > proposed for implementation in components such as Connect, the Kafka
> > Consumer and Streams.
> >
> > Stay tuned!
> > Konstantine
>
>


-- 
-- Guozhang