You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Vahid S Hashemian <va...@us.ibm.com> on 2016/04/14 20:05:35 UTC
[DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi all,
I have started a new KIP under
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
The corresponding JIRA is at
https://issues.apache.org/jira/browse/KAFKA-2273
The corresponding PR is at https://github.com/apache/kafka/pull/1020
Your feedback is much appreciated.
Regards,
Vahid Hashemian
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Guozhang,
Thanks for the pointer. I'll try to take a closer look and get a better
understanding and see if there is anything that can be leveraged for
KIP-54 implementation.
Regards,
Vahid Hashemian
From: Guozhang Wang <wa...@gmail.com>
To: "dev@kafka.apache.org" <de...@kafka.apache.org>
Date: 05/02/2016 10:34 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.
Guozhang
On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:
> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's
different
> from our normal usage -- certainly you can use them to generate a diff,
but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush
but
> not a close() or something? You still need to flush any output and
commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem
sound.
> Passing previous assignment data via UserData also seems reasonable
since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and
onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after
the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the
old
> > assignments are preserved in the current implementation; except for
when
> > the consumer group leader is killed; in which case, a fresh assignment
is
> > performed. This is something that needs to be fixed. I tried to use
your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint
it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From: Jason Gustafson <ja...@confluent.io>
> > To: dev@kafka.apache.org
> > Date: 04/14/2016 11:37 AM
> > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having
sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would
take
> > advantage of this assignor in the consumer to do this? Maybe one
approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at
the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,
Sorry about my misunderstanding, and thanks for sending the reference.
The grammar you sent is correct; that is how the current assignments are
preserved in the current implementation.
I understand your point about limiting the policies provided with the
Kafka release, and the value of providing sticky assignment out of the
box.
I'm okay with what the community decides in terms of which of these
options should go into Kafka.
I'll try to document these alternatives in the KIP.
Regards,
--Vahid
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 06/06/2016 08:14 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid,
The only thing I added was the specification of the UserData field. The
rest comes from here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
.
See the section on the JoinGroup request.
Generally speaking, I think having fewer assignment strategies included
with Kafka is probably better. One of the advantages of the client-side
assignment approach is that there's no actual need to bundle them into the
release. Applications can use them by depending on a separate library.
That
said, sticky assignment seems like a generally good idea and a common
need,
so it may be helpful for a lot of users to make it easily available in the
release. If it also addresses the issues raised in KIP-49, then so much
the
better.
As for whether we should include both, there I'm not too sure. Most users
probably wouldn't have a strong reason to choose the "fair" assignment
over
the "sticky" assignment since they both seem to have the same properties
in
terms of balancing the group's partitions. The overhead is a concern for
large groups with many topic subscriptions though, so if people think that
the "fair" approach brings a lot of benefit over round-robin, then it may
be worth including also.
-Jason
On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian
<vahidhashemian@us.ibm.com
> wrote:
> Hi Jason,
>
> Thanks for reviewing the KIP.
> I will add the details you requested, but to summarize:
>
> Regarding the structure of the user data:
>
> Right now the user data will have the current assignments only which is
a
> mapping of consumers to their assigned topic partitions. Is this mapping
> what you're also suggesting with CurrentAssignment field?
> I see how adding a version (as sticky assignor version) will be useful.
> Also how having a protocol name would be useful, perhaps for validation.
> But could you clarify the "Subscription" field and how you think it'll
> come into play?
>
>
> Regarding the algorithm:
>
> There could be similarities between how this KIP is implemented and how
> KIP-49 is handling the fairness. But since we had to take stickiness
into
> consideration we started fresh and did not adopt from KIP-49.
> The Sticky assignor implementation is comprehensive and guarantees the
> fairest possible assignment with highest stickiness. I even have a unit
> test that randomly generates an assignment problem and verifies that a
> fair and sticky assignment is calculated.
> KIP-54 gives priority to fairness over stickiness (which makes the
> implementation more complex). We could have another strategy that gives
> priority to stickiness over fairness (which supposedly will have a
better
> performance).
> The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates
> the assignment without considering the previous assignments (fairness
> only); whereas for KIP-54 previous assignments play a big role (fairness
> and stickiness).
> I believe if there is a situation where the stickiness requirements do
not
> exist it would make sense to use a fair-only assignment without the
> overhead of sticky assignment, as you mentioned.
> So, I could see three different strategies that could enrich assignment
> policy options.
> It would be great to have some feedback from the community about what is
> the best way to move forward with these two KIPs.
>
> In the meantime, I'll add some more details in the KIP about the
approach
> for calculating assignments.
>
> Thanks again.
>
> Regards,
> --Vahid
>
>
>
>
> From: Jason Gustafson <ja...@confluent.io>
> To: dev@kafka.apache.org
> Date: 06/06/2016 01:26 PM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
Strategy
>
>
>
> Hi Vahid,
>
> Can you add some detail to the KIP on the structure of the user data?
I'm
> guessing it would be something like this:
>
> ProtocolName => "sticky"
>
> ProtocolMetadata => Version Subscription UserData
> Version => int16
> Subscription => [Topic]
> Topic => string
> UserData => CurrentAssignment
> CurrentAssignment => [Topic [Partition]]
> Topic => string
> Partiton => int32
>
> It would also be helpful to include a little more detail on the
algorithm.
> From what I can tell, it looks like you're adopting some of the
strategies
> from KIP-49 to handle differing subscriptions better. If so, then I
wonder
> if it makes sense to combine the two KIPs? Or do you think there would
be
> an advantage to having the "fair" assignment strategy without the
overhead
> of the sticky assignor?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com>
wrote:
>
> > Sorry for being late on this thread.
> >
> > The assign() function is auto-triggered during the rebalance by one of
> the
> > consumers when it receives all subscription information collected from
> the
> > server-side coordinator.
> >
> > More details can be found here:
> >
> >
>
>
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> >
> > As for Kafka Streams, they way it did "stickiness" is by 1) let all
> > consumers put their current assigned topic-partitions and server ids
> into
> > the "metadata" field of the JoinGroupRequest, 2) when the selected
> consumer
> > triggers assign() along with all the subscriptions as well as their
> > metadata, it can parse the metadata to learn about the existing
> assignment
> > map; and hence when making the new assignment it will try to assign
> > partitions to its current owners "with best effort".
> >
> >
> > Hope this helps.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I was looking at the implementation of StreamsPartitionAssignor
> through
> > > its unit tests and expected to find some tests that
> > > - verify stickiness by making at least two calls to the assign()
> method
> > > (so we check the second assign() call output preserves the
assignments
> > > coming from the first assign() call output); or
> > > - start off by a preset assignment, call assign() after some
> subscription
> > > change, and verify the previous assignment are preserved.
> > > But none of the methods seem to do these. Did I overlook them, or
> > > stickiness is being tested in some other fashion?
> > >
> > > Also, if there is a high-level write-up about how this assignor
works
> > > could you please point me to it? Thanks.
> > >
> > > Regards.
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From: Guozhang Wang <wa...@gmail.com>
> > > To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > Date: 05/02/2016 10:34 AM
> > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already
> doing
> > > some sort of sticky partitioning mechanism. This is done through the
> > > userData field though; i.e. all group members send their current
> > "assigned
> > > partitions" in their join group request, which will be grouped and
> send
> > to
> > > the leader, the leader then does best-effort for
sticky-partitioning.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> > ewen@confluent.io>
> > > wrote:
> > >
> > > > I think I'm unclear how we leverage the
> > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > > different
> > > > from our normal usage -- certainly you can use them to generate a
> diff,
> > > but
> > > > you still need to commit when partitions are revoked and that has
a
> > > > non-trivial cost. Are we just saying that you might be able to
save
> > some
> > > > overhead, e.g. closing/reopening some other resources by doing a
> flush
> > > but
> > > > not a close() or something? You still need to flush any output and
> > > commit
> > > > offsets before returning from onPartitionsRevoked, right?
Otherwise
> you
> > > > couldn't guarantee clean handoff of partitions.
> > > >
> > > > In terms of the rebalancing, the basic requirements in the KIP
seem
> > > sound.
> > > > Passing previous assignment data via UserData also seems
reasonable
> > > since
> > > > it avoids redistributing all assignment data to all members and
> doesn't
> > > > rely on the next generation leader being a member of the current
> > > > generation. Hopefully this shouldn't be surprising since I think I
> > > > discussed this w/ Jason before he updated the relevant wiki pages
:)
> > > >
> > > > -Ewen
> > > >
> > > >
> > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > HI Jason,
> > > > >
> > > > > Thanks for your feedback.
> > > > >
> > > > > I believe your suggestion on how to take advantage of this
> assignor
> > is
> > > > > valid. We can leverage onPartitionsRevoked() and
> > > onPartitionsAssigned()
> > > > > callbacks and do a comparison of assigned partitions before and
> after
> > > the
> > > > > re-balance and do the cleanup only if there is a change (e.g.,
if
> > some
> > > > > previously assigned partition is not in the assignment).
> > > > >
> > > > > On your second question, a number of tests that I ran shows that
> the
> > > old
> > > > > assignments are preserved in the current implementation; except
> for
> > > when
> > > > > the consumer group leader is killed; in which case, a fresh
> > assignment
> > > is
> > > > > performed. This is something that needs to be fixed. I tried to
> use
> > > your
> > > > > pointers to find out where the best place is to preserve the old
> > > > > assignment in such circumstances but have not been able to
> pinpoint
> > > it.
> > > > If
> > > > > you have any suggestion on this please share. Thanks.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > From: Jason Gustafson <ja...@confluent.io>
> > > > > To: dev@kafka.apache.org
> > > > > Date: 04/14/2016 11:37 AM
> > > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > > Strategy
> > > > >
> > > > >
> > > > >
> > > > > Hi Vahid,
> > > > >
> > > > > Thanks for the proposal. I think one of the advantages of having
> > > sticky
> > > > > assignment would be reduce the need to cleanup local partition
> state
> > > > > between rebalances. Do you have any thoughts on how the user
would
> > > take
> > > > > advantage of this assignor in the consumer to do this? Maybe one
> > > approach
> > > > > is to delay cleanup until you detect a change from the previous
> > > > assignment
> > > > > in the onPartitionsAssigned() callback?
> > > > >
> > > > > Also, can you provide some detail on how the sticky assignor
works
> at
> > > the
> > > > > group protocol level? For example, do you pass old assignments
> > through
> > > > the
> > > > > "UserData" field in the consumer's JoinGroup?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > > vahidhashemian@us.ibm.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have started a new KIP under
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > >
> > > > >
> > > > > > The corresponding JIRA is at
> > > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > > The corresponding PR is at
> > https://github.com/apache/kafka/pull/1020
> > > > > >
> > > > > > Your feedback is much appreciated.
> > > > > >
> > > > > > Regards,
> > > > > > Vahid Hashemian
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thanks Andrew for your feedback and interest on this feature.
If there is no further feedback on this KIP (and no objection) I'll start
the voting process soon.
Thanks.
--Vahid
From: Andrew Coates <bi...@gmail.com>
To: dev@kafka.apache.org
Date: 08/10/2016 12:38 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
I'm still very interested in seeing this KIP progress ...
On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian <va...@us.ibm.com>
wrote:
> I would like to revive this thread and ask for additional feedback on
this
> KIP.
>
> There have already been some feedback, mostly in favor, plus some
concern
> about the value gain considering the complexity and the semantics; i.e.
> how the eventually revoked assignments need to be processed in the
> onPartitionsAssigned() callback, and not in onPartitionsRevoked().
>
> If it helps, I could also send a note to users mailing list about this
KIP
> and ask for their feedback.
> I could also put the KIP up for a vote if that is expected at this
point.
>
> Thanks.
> --Vahid
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Andrew Coates <bi...@gmail.com>.
I'm still very interested in seeing this KIP progress ...
On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian <va...@us.ibm.com>
wrote:
> I would like to revive this thread and ask for additional feedback on this
> KIP.
>
> There have already been some feedback, mostly in favor, plus some concern
> about the value gain considering the complexity and the semantics; i.e.
> how the eventually revoked assignments need to be processed in the
> onPartitionsAssigned() callback, and not in onPartitionsRevoked().
>
> If it helps, I could also send a note to users mailing list about this KIP
> and ask for their feedback.
> I could also put the KIP up for a vote if that is expected at this point.
>
> Thanks.
> --Vahid
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
I would like to revive this thread and ask for additional feedback on this
KIP.
There have already been some feedback, mostly in favor, plus some concern
about the value gain considering the complexity and the semantics; i.e.
how the eventually revoked assignments need to be processed in the
onPartitionsAssigned() callback, and not in onPartitionsRevoked().
If it helps, I could also send a note to users mailing list about this KIP
and ask for their feedback.
I could also put the KIP up for a vote if that is expected at this point.
Thanks.
--Vahid
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Onur,
Your understanding is correct.
If a consumer dies and later comes back, with the current proposal, there
is no guarantee that it would reclaim its previous assignment.
Regards,
--Vahid
From: Onur Karaman <ok...@linkedin.com.INVALID>
To: dev@kafka.apache.org
Date: 06/23/2016 01:03 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
From what I understood, it seems that stickiness is preserved only for the
remaining live consumers.
Say a consumer owns some partitions and then dies. Those partitions will
get redistributed to the rest of the group.
Now if the consumer comes back up, based on the algorithm described with
the concept of "reassignable partitions", then the consumer may get
different partitions than what it had before. Is my understanding right?
Put another way: once coming back up, can the consumer load its UserData
with the assignment it had before dying?
On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson <ja...@confluent.io>
wrote:
> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP
suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not
match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup
partition
> state when rebalancing since partitions are more likely to stay assigned
to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the
only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm
also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the
new
> consumer's round robin strategy and how (whether?) it is addressed by
this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to
describe
> some specific use cases to help motivate the proposal. One of the
downsides
> is that it requires users to restructure their code to get any benefit
from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is
a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in
the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Onur Karaman <ok...@linkedin.com.INVALID>.
From what I understood, it seems that stickiness is preserved only for the
remaining live consumers.
Say a consumer owns some partitions and then dies. Those partitions will
get redistributed to the rest of the group.
Now if the consumer comes back up, based on the algorithm described with
the concept of "reassignable partitions", then the consumer may get
different partitions than what it had before. Is my understanding right?
Put another way: once coming back up, can the consumer load its UserData
with the assignment it had before dying?
On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson <ja...@confluent.io> wrote:
> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup partition
> state when rebalancing since partitions are more likely to stay assigned to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the new
> consumer's round robin strategy and how (whether?) it is addressed by this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to describe
> some specific use cases to help motivate the proposal. One of the downsides
> is that it requires users to restructure their code to get any benefit from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,
Thanks for the thoughtful comments.
Please see my response below.
BTW, I have been trying to update the KIP with some of the recent
discussions on the mailing list.
Regards,
--Vahid
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 06/27/2016 12:53 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Vahid,
Comments below:
I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers
in
> a group as much as possible would normally mean balancing the load
within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.
I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have
the
same subscription, in which case the balance provided by round robin
should
be even (in terms of the number of assigned partitions). The only case
that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.
I agree that if we care only about a balanced assignment with same
subscriptions the round robin assignment is a good choice. But if we bring
in stickiness to the mix it won't be guaranteed by the round robin
assignor. An example (as Andrew mentioned in his earlier note) is elastic
consumers that come and go automatically depending on the load and how
much they lag behind. If these consumer maintain state of the partitions
they consume from it would be reasonable to want them to stick to their
assigned partitions, rather than having to repeat partition cleanup every
time the number of consumers changes due to an increase or decrease in
load.
I'll also think about it and let you know if I come up with a use case
with differing subscriptions. If differing subscriptions turns out not to
be a common use case, the design and implementation of the sticky assignor
could be modified to a far less complex setting so that
fairness/stickiness can be guaranteed for same subscriptions. As I
mentioned before, the current design / implementation is comprehensive and
can be tweaked towards a less complex solution if further assumptions can
be made.
Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list
(and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).
Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could
actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is
not
needed to deal with this problem.
That's correct, and thanks for going into the issue in more details.
Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples
of
> how Kafka can benefit from a sticky assignor. I could add those to the
KIP
> if you also think they help building the case in favor of sticky
assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.
Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this to users and may require them to change their usage pattern a
little bit. I think we expect users to do something like the following in
their rebalance listener:
class MyRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
commitOffsets(partition);
cleanupState(partition);
}
}
void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
initializeState(partition);
initializeOffset(partition);
}
}
}
This is fairly intuitive, but if you use this pattern, then sticky
assignment doesn't give you anything because you always cleanup state
prior
to the rebalance. Instead you need to do something like this:
class MyRebalanceListener {
Collection<TopicPartition> lastAssignment = Collections.emptyList();
void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
commitOffsets(partition);
}
}
void onPartitionsAssigned(Collection<TopicPartition> assignment) {
for (TopicPartition partition : difference(lastAssignment, assignment)
{
cleanupState(partition);
}
for (TopicPartition partition : difference(assignment, lastAssignment)
{
initializeState(partition);
}
for (TopicPartition partition : assignment) {
initializeOffset(partition);
}
this.lastAssignment = assignment;
}
}
This seems harder to explain and probably is the reason why Andy was
suggesting that it would be more ideal if we could simply skip the call to
onRevoked() if the partitions remain assigned to the consumer after the
rebalance. Unfortunately, the need to commit offsets prior to rebalancing
makes this tricky. The other option suggested by Andy would be to
introduce
a third method in the rebalance listener (e.g.
doOffsetCommit(partitions)).
Then the consumer would call doOffsetCommit() prior to every rebalance,
but
only invoke onPartitionsRevoked() when partitions have actually been
assigned to another consumer following the rebalance. Either way, we're
making the API more complex, which would be nice to avoid unless really
necessary.
Thanks for the code snippets. They look good and understandable given the
current callback listeners design.
I agree that with an additional callback as Andy suggested things would be
easier to justify and explain. As you mentioned, it's a matter of whether
we want the additional complexity that comes with it.
Overall, I think my feeling at the moment is that the sticky assignor is a
nice improvement over the currently available assignors, but the gain
seems
a little marginal and maybe not worth the cost of the complexity mentioned
above. It's not a strong feeling though and it would be nice to hear what
others think. The other thing worth mentioning is that we've talked a few
times in the past about the concept of "partial rebalancing," which would
allow the group to reassign only a subset of the partitions it was
consuming. This would let part of the group continue consuming while the
group is rebalancing. We don't have any proposals ready to support this,
but if we want to have this long term, then it might reduce some of the
benefit provided by the sticky assignor.
Understood, and thanks for sharing your concerns and feedback. I hope we
can get more feedback from the community on whether a sticky partition
assignment strategy in any form is beneficial to Kafka.
Thanks,
Jason
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,
Comments below:
I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers in
> a group as much as possible would normally mean balancing the load within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.
I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have the
same subscription, in which case the balance provided by round robin should
be even (in terms of the number of assigned partitions). The only case that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.
Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list (and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).
Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is not
needed to deal with this problem.
Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples of
> how Kafka can benefit from a sticky assignor. I could add those to the KIP
> if you also think they help building the case in favor of sticky assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.
Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this to users and may require them to change their usage pattern a
little bit. I think we expect users to do something like the following in
their rebalance listener:
class MyRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
commitOffsets(partition);
cleanupState(partition);
}
}
void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
initializeState(partition);
initializeOffset(partition);
}
}
}
This is fairly intuitive, but if you use this pattern, then sticky
assignment doesn't give you anything because you always cleanup state prior
to the rebalance. Instead you need to do something like this:
class MyRebalanceListener {
Collection<TopicPartition> lastAssignment = Collections.emptyList();
void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
commitOffsets(partition);
}
}
void onPartitionsAssigned(Collection<TopicPartition> assignment) {
for (TopicPartition partition : difference(lastAssignment, assignment) {
cleanupState(partition);
}
for (TopicPartition partition : difference(assignment, lastAssignment) {
initializeState(partition);
}
for (TopicPartition partition : assignment) {
initializeOffset(partition);
}
this.lastAssignment = assignment;
}
}
This seems harder to explain and probably is the reason why Andy was
suggesting that it would be more ideal if we could simply skip the call to
onRevoked() if the partitions remain assigned to the consumer after the
rebalance. Unfortunately, the need to commit offsets prior to rebalancing
makes this tricky. The other option suggested by Andy would be to introduce
a third method in the rebalance listener (e.g. doOffsetCommit(partitions)).
Then the consumer would call doOffsetCommit() prior to every rebalance, but
only invoke onPartitionsRevoked() when partitions have actually been
assigned to another consumer following the rebalance. Either way, we're
making the API more complex, which would be nice to avoid unless really
necessary.
Overall, I think my feeling at the moment is that the sticky assignor is a
nice improvement over the currently available assignors, but the gain seems
a little marginal and maybe not worth the cost of the complexity mentioned
above. It's not a strong feeling though and it would be nice to hear what
others think. The other thing worth mentioning is that we've talked a few
times in the past about the concept of "partial rebalancing," which would
allow the group to reassign only a subset of the partitions it was
consuming. This would let part of the group continue consuming while the
group is rebalancing. We don't have any proposals ready to support this,
but if we want to have this long term, then it might reduce some of the
benefit provided by the sticky assignor.
Thanks,
Jason
On Thu, Jun 23, 2016 at 5:04 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Thank you Andy for your feedback on the KIP.
>
> I agree with Jason on the responses he provided below.
>
> If we give precedence to fairness over stickiness there is no assumption
> that can be made about which assignment would remain and which would be
> revoked.
> If we give precedence to stickiness over fairness, we can be sure that all
> existing valid assignments (those with their topic partition still valid)
> would remain.
>
> I'll add your example to the KIP, but this is how it should work with
> sticky assignor:
>
> We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions.
> Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two
> consumers are subscribed to both t0 and t1.
> The assignment using the stick assignor will be:
> * C0: [t0p0, t1p0]
> * C1: [t0p1, t1p1]
>
> Now if we add C2 (subscribed to both topics), this is what we get:
> * C0: [t1p0]
> * C1: [t0p1, t1p1]
> * C2: [t0p0]
>
> I think both range and round robin assignors would produce this:
> * C0: [t0p0, t1p1]
> * C1: [t0p1]
> * C2: [t1p0]
>
> Regards,
> --Vahid
>
>
>
>
> From: Jason Gustafson <ja...@confluent.io>
> To: dev@kafka.apache.org
> Date: 06/23/2016 10:06 AM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hey Andy,
>
> Thanks for jumping in. A couple comments:
>
> In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
>
>
> The problem is that the revocation callback is called before you know what
> the assignment for the next generation will be. This is necessary for the
> consumer to be able to commit offsets for its assigned partitions. Once
> the
> consumer has a new assignment, it is no longer safe to commit offsets from
> the previous generation. Unless sticky assignment can give us some
> guarantee on which partitions will remain after the rebalance, all of them
> must be included in the revocation callback.
>
>
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
>
>
> This would be true for range assignment, but with 4 partitions total,
> round-robin assignment would give one partition to each of the 4 consumers
> (assuming subscriptions match).
>
> Thanks,
> Jason
>
>
> On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I think sticky assignment is immensely important / useful in many
> > situations. Apps that use Kafka are many and varied. Any app that stores
> > any state, either in the form of data from incoming messages, cached
> > results from previous out-of-process calls or expensive operations, (and
> > let's face it, that's most!), can see a big negative impact from
> partition
> > movement.
> >
> > The main issue partition movement brings is that it makes building
> elastic
> > services very hard. Consider: you've got an app consuming from Kafka
> that
> > locally caches data to improve performance. You want the app to auto
> scale
> > as the throughout to the topic(s) increases. Currently, when one or
> more
> > new instance are added and the group rebalances, all existing instances
> > have all partitions revoked, and then a new, potentially quite
> different,
> > set assigned. An intuitive pattern is to evict partition state, I.e. the
> > cached data, when a partition is revoked. So in this case all apps flush
> > their entire cache causing throughput to drop massively, right when you
> > want to increase it!
> >
> > Even if the app is not flushing partition state when partitions are
> > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > cached state is now useless, and instances have partitions assigned for
> > which they have no cached state, again negatively impacting throughout.
> >
> > With a 'sticky' strategy throughput can be maintained and indeed
> increased,
> > as intended.
> >
> > The same is also true in the presence of failure. An instance failing,
> > (maybe due to high load), can invalidate the caching of existing
> instances,
> > negatively impacting throughout of the remaining instances, (possibly at
> a
> > time the system needs throughput the most!)
> >
> > My question would be 'why move partitions if you don't have to?'. I will
> > certainly be setting the 'sticky' assignment strategy as the default
> once
> > it's released, and I have a feeling it will become the default in the
> > communitie's 'best-practice' guides.
> >
> > In addition, I think it is important that during a rebalance consumers
> do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
> >
> > I think the change to how the client listener is called should be part
> of
> > this work.
> >
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
> >
> > Thanks,
> >
> >
> > Andy
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
> >
> > > Hey Vahid,
> > >
> > > Thanks for the updates. I think the lack of comments on this KIP
> suggests
> > > that the motivation might need a little work. Here are the two main
> > > benefits of this assignor as I see them:
> > >
> > > 1. It can give a more balanced assignment when subscriptions do not
> match
> > > in a group (this is the same problem solved by KIP-49).
> > > 2. It potentially allows applications to save the need to cleanup
> > partition
> > > state when rebalancing since partitions are more likely to stay
> assigned
> > to
> > > the same consumer.
> > >
> > > Does that seem right to you?
> > >
> > > I think it's unclear how serious the first problem is. Providing
> better
> > > balance when subscriptions differ is nice, but are rolling updates the
> > only
> > > scenario where this is encountered? Or are there more general use
> cases
> > > where differing subscriptions could persist for a longer duration? I'm
> > also
> > > wondering if this assignor addresses the problem found in KAFKA-2019.
> It
> > > would be useful to confirm whether this problem still exists with the
> new
> > > consumer's round robin strategy and how (whether?) it is addressed by
> > this
> > > assignor.
> > >
> > > The major selling point seems to be the second point. This is
> definitely
> > > nice to have, but would you expect a lot of value in practice since
> > > consumer groups are usually assumed to be stable? It might help to
> > describe
> > > some specific use cases to help motivate the proposal. One of the
> > downsides
> > > is that it requires users to restructure their code to get any benefit
> > from
> > > it. In particular, they need to move partition cleanup out of the
> > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> is a
> > > little awkward and will probably make explaining the consumer more
> > > difficult. It's probably worth including a discussion of this point in
> > the
> > > proposal with an example.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com
> > > > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > I updated the KIP and added some details about the user data, the
> > > > assignment algorithm, and the alternative strategies to consider.
> > > >
> > > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > > >
> > > > Please let me know if I missed to add something. Thank you.
> > > >
> > > > Regards,
> > > > --Vahid
> > > >
> > > >
> > > >
> > >
> >
>
>
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thank you Andy for your feedback on the KIP.
I agree with Jason on the responses he provided below.
If we give precedence to fairness over stickiness there is no assumption
that can be made about which assignment would remain and which would be
revoked.
If we give precedence to stickiness over fairness, we can be sure that all
existing valid assignments (those with their topic partition still valid)
would remain.
I'll add your example to the KIP, but this is how it should work with
sticky assignor:
We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions.
Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two
consumers are subscribed to both t0 and t1.
The assignment using the stick assignor will be:
* C0: [t0p0, t1p0]
* C1: [t0p1, t1p1]
Now if we add C2 (subscribed to both topics), this is what we get:
* C0: [t1p0]
* C1: [t0p1, t1p1]
* C2: [t0p0]
I think both range and round robin assignors would produce this:
* C0: [t0p0, t1p1]
* C1: [t0p1]
* C2: [t1p0]
Regards,
--Vahid
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 06/23/2016 10:06 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Andy,
Thanks for jumping in. A couple comments:
In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or
the
> same!), set reassigned. This is less than initiative and complicates
client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only
be
> called for true changes in assignment I.e. any new partitions assigned
and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
The problem is that the revocation callback is called before you know what
the assignment for the next generation will be. This is necessary for the
consumer to be able to commit offsets for its assigned partitions. Once
the
consumer has a new assignment, it is no longer safe to commit offsets from
the previous generation. Unless sticky assignment can give us some
guarantee on which partitions will remain after the rebalance, all of them
must be included in the revocation callback.
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each
topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you
can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This
should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
This would be true for range assignment, but with 4 partitions total,
round-robin assignment would give one partition to each of the 4 consumers
(assuming subscriptions match).
Thanks,
Jason
On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
wrote:
> Hi all,
>
> I think sticky assignment is immensely important / useful in many
> situations. Apps that use Kafka are many and varied. Any app that stores
> any state, either in the form of data from incoming messages, cached
> results from previous out-of-process calls or expensive operations, (and
> let's face it, that's most!), can see a big negative impact from
partition
> movement.
>
> The main issue partition movement brings is that it makes building
elastic
> services very hard. Consider: you've got an app consuming from Kafka
that
> locally caches data to improve performance. You want the app to auto
scale
> as the throughout to the topic(s) increases. Currently, when one or
more
> new instance are added and the group rebalances, all existing instances
> have all partitions revoked, and then a new, potentially quite
different,
> set assigned. An intuitive pattern is to evict partition state, I.e. the
> cached data, when a partition is revoked. So in this case all apps flush
> their entire cache causing throughput to drop massively, right when you
> want to increase it!
>
> Even if the app is not flushing partition state when partitions are
> revoked, the lack of a 'sticky' strategy means that a proportion of the
> cached state is now useless, and instances have partitions assigned for
> which they have no cached state, again negatively impacting throughout.
>
> With a 'sticky' strategy throughput can be maintained and indeed
increased,
> as intended.
>
> The same is also true in the presence of failure. An instance failing,
> (maybe due to high load), can invalidate the caching of existing
instances,
> negatively impacting throughout of the remaining instances, (possibly at
a
> time the system needs throughput the most!)
>
> My question would be 'why move partitions if you don't have to?'. I will
> certainly be setting the 'sticky' assignment strategy as the default
once
> it's released, and I have a feeling it will become the default in the
> communitie's 'best-practice' guides.
>
> In addition, I think it is important that during a rebalance consumers
do
> not first have all partitions revoked, only to have a very similar, (or
the
> same!), set reassigned. This is less than initiative and complicates
client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only
be
> called for true changes in assignment I.e. any new partitions assigned
and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
>
> I think the change to how the client listener is called should be part
of
> this work.
>
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each
topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you
can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This
should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
>
> Thanks,
>
>
> Andy
>
>
>
>
>
>
>
>
> On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Vahid,
> >
> > Thanks for the updates. I think the lack of comments on this KIP
suggests
> > that the motivation might need a little work. Here are the two main
> > benefits of this assignor as I see them:
> >
> > 1. It can give a more balanced assignment when subscriptions do not
match
> > in a group (this is the same problem solved by KIP-49).
> > 2. It potentially allows applications to save the need to cleanup
> partition
> > state when rebalancing since partitions are more likely to stay
assigned
> to
> > the same consumer.
> >
> > Does that seem right to you?
> >
> > I think it's unclear how serious the first problem is. Providing
better
> > balance when subscriptions differ is nice, but are rolling updates the
> only
> > scenario where this is encountered? Or are there more general use
cases
> > where differing subscriptions could persist for a longer duration? I'm
> also
> > wondering if this assignor addresses the problem found in KAFKA-2019.
It
> > would be useful to confirm whether this problem still exists with the
new
> > consumer's round robin strategy and how (whether?) it is addressed by
> this
> > assignor.
> >
> > The major selling point seems to be the second point. This is
definitely
> > nice to have, but would you expect a lot of value in practice since
> > consumer groups are usually assumed to be stable? It might help to
> describe
> > some specific use cases to help motivate the proposal. One of the
> downsides
> > is that it requires users to restructure their code to get any benefit
> from
> > it. In particular, they need to move partition cleanup out of the
> > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
is a
> > little awkward and will probably make explaining the consumer more
> > difficult. It's probably worth including a discussion of this point in
> the
> > proposal with an example.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com
> > > wrote:
> >
> > > Hi Jason,
> > >
> > > I updated the KIP and added some details about the user data, the
> > > assignment algorithm, and the alternative strategies to consider.
> > >
> > >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > >
> > > Please let me know if I missed to add something. Thank you.
> > >
> > > Regards,
> > > --Vahid
> > >
> > >
> > >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Gouzhang,
Thanks for the reference.
A similar question was asked earlier about whether, with sticky assignor,
consumers stick to their previous partitions if they die and come back
later.
Currently the sticky assignor does not support that because it only
preserves only the last assignment before the rebalance.
If a consumer dies and comes back during different rebalance intervals
there is no guarantee it would gets its previous partitions.
If the community sees this as an important requirement for the sticky
assignor we can definitely include it in the KIP.
Regards,
-----------------------------------------------------------------
Vahid Hashemian, Ph.D.
Advisory Software Engineer, IBM Cloud
Email: vahidhashemian@us.ibm.com
Phone: 1-408-463-2380
IBM Silicon Valley Lab
555 Bailey Ave.
San Jose, CA 95141
From: Guozhang Wang <wa...@gmail.com>
To: "dev@kafka.apache.org" <de...@kafka.apache.org>
Date: 06/23/2016 03:28 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Just adding some related reference here:
Henry Cai is contributing some advanced feature in Kafka Streams regarding
static assignment: https://github.com/apache/kafka/pull/1543
The main motivation is that when you do rolling bounce for upgrading your
Kafka Streams code, for example, you would prefer to not move assigned
partitions of the current bouncing instance to others, and today it is
worked around by increasing the session.timeout; but what is more tricky
is
that when the bouncing instance comes back, it will still trigger a
rebalance. The idea is that as long as we can encode the previous
iteration's assignment map, and we can check that the list of partitions /
members does not change regarding to their previous assigned partitions,
we
keep the assigned as is.
Guozhang
On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates
<bi...@gmail.com>
wrote:
> Hey Jason,
>
> Good to know on the round robin assignment. I'll look into that.
>
> The issue I have with the current rebalance listener is that it's not
> intuitive and unnecessarily exposes the inner workings of rebalance
logic.
> When the onPartitionsRevoked method is called it's not really saying the
> partitions were revoked. It's really saying a rebalance is happening and
> you need to deal with any in-flight partitions & commit offsets. So
maybe
> the method name is wrong! Maybe it should be 'onRebalance' or
> 'commitOffsets'..? Then the interface could also have an
> onPartitionsRevoked method that is only called when partitions have been
> revoked and given to someone else to handle, rather than just kind of
> paused while we rebalance... maybe the new method could be
> onPausePartitions?
>
> Andy
>
> On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Andy,
> >
> > Thanks for jumping in. A couple comments:
> >
> > In addition, I think it is important that during a rebalance consumers
do
> > > not first have all partitions revoked, only to have a very similar,
(or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions
assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> >
> >
> > The problem is that the revocation callback is called before you know
> what
> > the assignment for the next generation will be. This is necessary for
the
> > consumer to be able to commit offsets for its assigned partitions.
Once
> the
> > consumer has a new assignment, it is no longer safe to commit offsets
> from
> > the previous generation. Unless sticky assignment can give us some
> > guarantee on which partitions will remain after the rebalance, all of
> them
> > must be included in the revocation callback.
> >
> >
> > > There is one last scenario I'd like to highlight that I think the
KIP
> > > should describe: say you have a group consuming from two topics,
each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers
you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> >
> >
> > This would be true for range assignment, but with 4 partitions total,
> > round-robin assignment would give one partition to each of the 4
> consumers
> > (assuming subscriptions match).
> >
> > Thanks,
> > Jason
> >
> >
> > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <
> big.andy.coates@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I think sticky assignment is immensely important / useful in many
> > > situations. Apps that use Kafka are many and varied. Any app that
> stores
> > > any state, either in the form of data from incoming messages, cached
> > > results from previous out-of-process calls or expensive operations,
> (and
> > > let's face it, that's most!), can see a big negative impact from
> > partition
> > > movement.
> > >
> > > The main issue partition movement brings is that it makes building
> > elastic
> > > services very hard. Consider: you've got an app consuming from Kafka
> that
> > > locally caches data to improve performance. You want the app to auto
> > scale
> > > as the throughout to the topic(s) increases. Currently, when one
or
> > more
> > > new instance are added and the group rebalances, all existing
instances
> > > have all partitions revoked, and then a new, potentially quite
> different,
> > > set assigned. An intuitive pattern is to evict partition state, I.e.
> the
> > > cached data, when a partition is revoked. So in this case all apps
> flush
> > > their entire cache causing throughput to drop massively, right when
you
> > > want to increase it!
> > >
> > > Even if the app is not flushing partition state when partitions are
> > > revoked, the lack of a 'sticky' strategy means that a proportion of
the
> > > cached state is now useless, and instances have partitions assigned
for
> > > which they have no cached state, again negatively impacting
throughout.
> > >
> > > With a 'sticky' strategy throughput can be maintained and indeed
> > increased,
> > > as intended.
> > >
> > > The same is also true in the presence of failure. An instance
failing,
> > > (maybe due to high load), can invalidate the caching of existing
> > instances,
> > > negatively impacting throughout of the remaining instances,
(possibly
> at
> > a
> > > time the system needs throughput the most!)
> > >
> > > My question would be 'why move partitions if you don't have to?'. I
> will
> > > certainly be setting the 'sticky' assignment strategy as the default
> once
> > > it's released, and I have a feeling it will become the default in
the
> > > communitie's 'best-practice' guides.
> > >
> > > In addition, I think it is important that during a rebalance
consumers
> do
> > > not first have all partitions revoked, only to have a very similar,
(or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions
assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> > >
> > > I think the change to how the client listener is called should be
part
> of
> > > this work.
> > >
> > > There is one last scenario I'd like to highlight that I think the
KIP
> > > should describe: say you have a group consuming from two topics,
each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers
you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> > >
> > > Thanks,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io>
> wrote:
> > >
> > > > Hey Vahid,
> > > >
> > > > Thanks for the updates. I think the lack of comments on this KIP
> > suggests
> > > > that the motivation might need a little work. Here are the two
main
> > > > benefits of this assignor as I see them:
> > > >
> > > > 1. It can give a more balanced assignment when subscriptions do
not
> > match
> > > > in a group (this is the same problem solved by KIP-49).
> > > > 2. It potentially allows applications to save the need to cleanup
> > > partition
> > > > state when rebalancing since partitions are more likely to stay
> > assigned
> > > to
> > > > the same consumer.
> > > >
> > > > Does that seem right to you?
> > > >
> > > > I think it's unclear how serious the first problem is. Providing
> better
> > > > balance when subscriptions differ is nice, but are rolling updates
> the
> > > only
> > > > scenario where this is encountered? Or are there more general use
> cases
> > > > where differing subscriptions could persist for a longer duration?
> I'm
> > > also
> > > > wondering if this assignor addresses the problem found in
KAFKA-2019.
> > It
> > > > would be useful to confirm whether this problem still exists with
the
> > new
> > > > consumer's round robin strategy and how (whether?) it is addressed
by
> > > this
> > > > assignor.
> > > >
> > > > The major selling point seems to be the second point. This is
> > definitely
> > > > nice to have, but would you expect a lot of value in practice
since
> > > > consumer groups are usually assumed to be stable? It might help to
> > > describe
> > > > some specific use cases to help motivate the proposal. One of the
> > > downsides
> > > > is that it requires users to restructure their code to get any
> benefit
> > > from
> > > > it. In particular, they need to move partition cleanup out of the
> > > > onPartitionsRevoked() callback and into onPartitionsAssigned().
This
> > is a
> > > > little awkward and will probably make explaining the consumer more
> > > > difficult. It's probably worth including a discussion of this
point
> in
> > > the
> > > > proposal with an example.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com
> > > > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > I updated the KIP and added some details about the user data,
the
> > > > > assignment algorithm, and the alternative strategies to
consider.
> > > > >
> > > > >
> > > >
> > >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > > > >
> > > > > Please let me know if I missed to add something. Thank you.
> > > > >
> > > > > Regards,
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Guozhang Wang <wa...@gmail.com>.
Just adding some related reference here:
Henry Cai is contributing some advanced feature in Kafka Streams regarding
static assignment: https://github.com/apache/kafka/pull/1543
The main motivation is that when you do rolling bounce for upgrading your
Kafka Streams code, for example, you would prefer to not move assigned
partitions of the current bouncing instance to others, and today it is
worked around by increasing the session.timeout; but what is more tricky is
that when the bouncing instance comes back, it will still trigger a
rebalance. The idea is that as long as we can encode the previous
iteration's assignment map, and we can check that the list of partitions /
members does not change regarding to their previous assigned partitions, we
keep the assigned as is.
Guozhang
On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates <bi...@gmail.com>
wrote:
> Hey Jason,
>
> Good to know on the round robin assignment. I'll look into that.
>
> The issue I have with the current rebalance listener is that it's not
> intuitive and unnecessarily exposes the inner workings of rebalance logic.
> When the onPartitionsRevoked method is called it's not really saying the
> partitions were revoked. It's really saying a rebalance is happening and
> you need to deal with any in-flight partitions & commit offsets. So maybe
> the method name is wrong! Maybe it should be 'onRebalance' or
> 'commitOffsets'..? Then the interface could also have an
> onPartitionsRevoked method that is only called when partitions have been
> revoked and given to someone else to handle, rather than just kind of
> paused while we rebalance... maybe the new method could be
> onPausePartitions?
>
> Andy
>
> On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Andy,
> >
> > Thanks for jumping in. A couple comments:
> >
> > In addition, I think it is important that during a rebalance consumers do
> > > not first have all partitions revoked, only to have a very similar, (or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> >
> >
> > The problem is that the revocation callback is called before you know
> what
> > the assignment for the next generation will be. This is necessary for the
> > consumer to be able to commit offsets for its assigned partitions. Once
> the
> > consumer has a new assignment, it is no longer safe to commit offsets
> from
> > the previous generation. Unless sticky assignment can give us some
> > guarantee on which partitions will remain after the rebalance, all of
> them
> > must be included in the revocation callback.
> >
> >
> > > There is one last scenario I'd like to highlight that I think the KIP
> > > should describe: say you have a group consuming from two topics, each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> >
> >
> > This would be true for range assignment, but with 4 partitions total,
> > round-robin assignment would give one partition to each of the 4
> consumers
> > (assuming subscriptions match).
> >
> > Thanks,
> > Jason
> >
> >
> > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <
> big.andy.coates@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I think sticky assignment is immensely important / useful in many
> > > situations. Apps that use Kafka are many and varied. Any app that
> stores
> > > any state, either in the form of data from incoming messages, cached
> > > results from previous out-of-process calls or expensive operations,
> (and
> > > let's face it, that's most!), can see a big negative impact from
> > partition
> > > movement.
> > >
> > > The main issue partition movement brings is that it makes building
> > elastic
> > > services very hard. Consider: you've got an app consuming from Kafka
> that
> > > locally caches data to improve performance. You want the app to auto
> > scale
> > > as the throughout to the topic(s) increases. Currently, when one or
> > more
> > > new instance are added and the group rebalances, all existing instances
> > > have all partitions revoked, and then a new, potentially quite
> different,
> > > set assigned. An intuitive pattern is to evict partition state, I.e.
> the
> > > cached data, when a partition is revoked. So in this case all apps
> flush
> > > their entire cache causing throughput to drop massively, right when you
> > > want to increase it!
> > >
> > > Even if the app is not flushing partition state when partitions are
> > > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > > cached state is now useless, and instances have partitions assigned for
> > > which they have no cached state, again negatively impacting throughout.
> > >
> > > With a 'sticky' strategy throughput can be maintained and indeed
> > increased,
> > > as intended.
> > >
> > > The same is also true in the presence of failure. An instance failing,
> > > (maybe due to high load), can invalidate the caching of existing
> > instances,
> > > negatively impacting throughout of the remaining instances, (possibly
> at
> > a
> > > time the system needs throughput the most!)
> > >
> > > My question would be 'why move partitions if you don't have to?'. I
> will
> > > certainly be setting the 'sticky' assignment strategy as the default
> once
> > > it's released, and I have a feeling it will become the default in the
> > > communitie's 'best-practice' guides.
> > >
> > > In addition, I think it is important that during a rebalance consumers
> do
> > > not first have all partitions revoked, only to have a very similar, (or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> > >
> > > I think the change to how the client listener is called should be part
> of
> > > this work.
> > >
> > > There is one last scenario I'd like to highlight that I think the KIP
> > > should describe: say you have a group consuming from two topics, each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> > >
> > > Thanks,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io>
> wrote:
> > >
> > > > Hey Vahid,
> > > >
> > > > Thanks for the updates. I think the lack of comments on this KIP
> > suggests
> > > > that the motivation might need a little work. Here are the two main
> > > > benefits of this assignor as I see them:
> > > >
> > > > 1. It can give a more balanced assignment when subscriptions do not
> > match
> > > > in a group (this is the same problem solved by KIP-49).
> > > > 2. It potentially allows applications to save the need to cleanup
> > > partition
> > > > state when rebalancing since partitions are more likely to stay
> > assigned
> > > to
> > > > the same consumer.
> > > >
> > > > Does that seem right to you?
> > > >
> > > > I think it's unclear how serious the first problem is. Providing
> better
> > > > balance when subscriptions differ is nice, but are rolling updates
> the
> > > only
> > > > scenario where this is encountered? Or are there more general use
> cases
> > > > where differing subscriptions could persist for a longer duration?
> I'm
> > > also
> > > > wondering if this assignor addresses the problem found in KAFKA-2019.
> > It
> > > > would be useful to confirm whether this problem still exists with the
> > new
> > > > consumer's round robin strategy and how (whether?) it is addressed by
> > > this
> > > > assignor.
> > > >
> > > > The major selling point seems to be the second point. This is
> > definitely
> > > > nice to have, but would you expect a lot of value in practice since
> > > > consumer groups are usually assumed to be stable? It might help to
> > > describe
> > > > some specific use cases to help motivate the proposal. One of the
> > > downsides
> > > > is that it requires users to restructure their code to get any
> benefit
> > > from
> > > > it. In particular, they need to move partition cleanup out of the
> > > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> > is a
> > > > little awkward and will probably make explaining the consumer more
> > > > difficult. It's probably worth including a discussion of this point
> in
> > > the
> > > > proposal with an example.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com
> > > > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > I updated the KIP and added some details about the user data, the
> > > > > assignment algorithm, and the alternative strategies to consider.
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > > > >
> > > > > Please let me know if I missed to add something. Thank you.
> > > > >
> > > > > Regards,
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Andrew Coates <bi...@gmail.com>.
Hey Jason,
Good to know on the round robin assignment. I'll look into that.
The issue I have with the current rebalance listener is that it's not
intuitive and unnecessarily exposes the inner workings of rebalance logic.
When the onPartitionsRevoked method is called it's not really saying the
partitions were revoked. It's really saying a rebalance is happening and
you need to deal with any in-flight partitions & commit offsets. So maybe
the method name is wrong! Maybe it should be 'onRebalance' or
'commitOffsets'..? Then the interface could also have an
onPartitionsRevoked method that is only called when partitions have been
revoked and given to someone else to handle, rather than just kind of
paused while we rebalance... maybe the new method could be
onPausePartitions?
Andy
On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:
> Hey Andy,
>
> Thanks for jumping in. A couple comments:
>
> In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
>
>
> The problem is that the revocation callback is called before you know what
> the assignment for the next generation will be. This is necessary for the
> consumer to be able to commit offsets for its assigned partitions. Once the
> consumer has a new assignment, it is no longer safe to commit offsets from
> the previous generation. Unless sticky assignment can give us some
> guarantee on which partitions will remain after the rebalance, all of them
> must be included in the revocation callback.
>
>
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
>
>
> This would be true for range assignment, but with 4 partitions total,
> round-robin assignment would give one partition to each of the 4 consumers
> (assuming subscriptions match).
>
> Thanks,
> Jason
>
>
> On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I think sticky assignment is immensely important / useful in many
> > situations. Apps that use Kafka are many and varied. Any app that stores
> > any state, either in the form of data from incoming messages, cached
> > results from previous out-of-process calls or expensive operations, (and
> > let's face it, that's most!), can see a big negative impact from
> partition
> > movement.
> >
> > The main issue partition movement brings is that it makes building
> elastic
> > services very hard. Consider: you've got an app consuming from Kafka that
> > locally caches data to improve performance. You want the app to auto
> scale
> > as the throughout to the topic(s) increases. Currently, when one or
> more
> > new instance are added and the group rebalances, all existing instances
> > have all partitions revoked, and then a new, potentially quite different,
> > set assigned. An intuitive pattern is to evict partition state, I.e. the
> > cached data, when a partition is revoked. So in this case all apps flush
> > their entire cache causing throughput to drop massively, right when you
> > want to increase it!
> >
> > Even if the app is not flushing partition state when partitions are
> > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > cached state is now useless, and instances have partitions assigned for
> > which they have no cached state, again negatively impacting throughout.
> >
> > With a 'sticky' strategy throughput can be maintained and indeed
> increased,
> > as intended.
> >
> > The same is also true in the presence of failure. An instance failing,
> > (maybe due to high load), can invalidate the caching of existing
> instances,
> > negatively impacting throughout of the remaining instances, (possibly at
> a
> > time the system needs throughput the most!)
> >
> > My question would be 'why move partitions if you don't have to?'. I will
> > certainly be setting the 'sticky' assignment strategy as the default once
> > it's released, and I have a feeling it will become the default in the
> > communitie's 'best-practice' guides.
> >
> > In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
> >
> > I think the change to how the client listener is called should be part of
> > this work.
> >
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
> >
> > Thanks,
> >
> >
> > Andy
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
> >
> > > Hey Vahid,
> > >
> > > Thanks for the updates. I think the lack of comments on this KIP
> suggests
> > > that the motivation might need a little work. Here are the two main
> > > benefits of this assignor as I see them:
> > >
> > > 1. It can give a more balanced assignment when subscriptions do not
> match
> > > in a group (this is the same problem solved by KIP-49).
> > > 2. It potentially allows applications to save the need to cleanup
> > partition
> > > state when rebalancing since partitions are more likely to stay
> assigned
> > to
> > > the same consumer.
> > >
> > > Does that seem right to you?
> > >
> > > I think it's unclear how serious the first problem is. Providing better
> > > balance when subscriptions differ is nice, but are rolling updates the
> > only
> > > scenario where this is encountered? Or are there more general use cases
> > > where differing subscriptions could persist for a longer duration? I'm
> > also
> > > wondering if this assignor addresses the problem found in KAFKA-2019.
> It
> > > would be useful to confirm whether this problem still exists with the
> new
> > > consumer's round robin strategy and how (whether?) it is addressed by
> > this
> > > assignor.
> > >
> > > The major selling point seems to be the second point. This is
> definitely
> > > nice to have, but would you expect a lot of value in practice since
> > > consumer groups are usually assumed to be stable? It might help to
> > describe
> > > some specific use cases to help motivate the proposal. One of the
> > downsides
> > > is that it requires users to restructure their code to get any benefit
> > from
> > > it. In particular, they need to move partition cleanup out of the
> > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> is a
> > > little awkward and will probably make explaining the consumer more
> > > difficult. It's probably worth including a discussion of this point in
> > the
> > > proposal with an example.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com
> > > > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > I updated the KIP and added some details about the user data, the
> > > > assignment algorithm, and the alternative strategies to consider.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > > >
> > > > Please let me know if I missed to add something. Thank you.
> > > >
> > > > Regards,
> > > > --Vahid
> > > >
> > > >
> > > >
> > >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hey Andy,
Thanks for jumping in. A couple comments:
In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or the
> same!), set reassigned. This is less than initiative and complicates client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
> called for true changes in assignment I.e. any new partitions assigned and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
The problem is that the revocation callback is called before you know what
the assignment for the next generation will be. This is necessary for the
consumer to be able to commit offsets for its assigned partitions. Once the
consumer has a new assignment, it is no longer safe to commit offsets from
the previous generation. Unless sticky assignment can give us some
guarantee on which partitions will remain after the rebalance, all of them
must be included in the revocation callback.
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
This would be true for range assignment, but with 4 partitions total,
round-robin assignment would give one partition to each of the 4 consumers
(assuming subscriptions match).
Thanks,
Jason
On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
wrote:
> Hi all,
>
> I think sticky assignment is immensely important / useful in many
> situations. Apps that use Kafka are many and varied. Any app that stores
> any state, either in the form of data from incoming messages, cached
> results from previous out-of-process calls or expensive operations, (and
> let's face it, that's most!), can see a big negative impact from partition
> movement.
>
> The main issue partition movement brings is that it makes building elastic
> services very hard. Consider: you've got an app consuming from Kafka that
> locally caches data to improve performance. You want the app to auto scale
> as the throughout to the topic(s) increases. Currently, when one or more
> new instance are added and the group rebalances, all existing instances
> have all partitions revoked, and then a new, potentially quite different,
> set assigned. An intuitive pattern is to evict partition state, I.e. the
> cached data, when a partition is revoked. So in this case all apps flush
> their entire cache causing throughput to drop massively, right when you
> want to increase it!
>
> Even if the app is not flushing partition state when partitions are
> revoked, the lack of a 'sticky' strategy means that a proportion of the
> cached state is now useless, and instances have partitions assigned for
> which they have no cached state, again negatively impacting throughout.
>
> With a 'sticky' strategy throughput can be maintained and indeed increased,
> as intended.
>
> The same is also true in the presence of failure. An instance failing,
> (maybe due to high load), can invalidate the caching of existing instances,
> negatively impacting throughout of the remaining instances, (possibly at a
> time the system needs throughput the most!)
>
> My question would be 'why move partitions if you don't have to?'. I will
> certainly be setting the 'sticky' assignment strategy as the default once
> it's released, and I have a feeling it will become the default in the
> communitie's 'best-practice' guides.
>
> In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or the
> same!), set reassigned. This is less than initiative and complicates client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
> called for true changes in assignment I.e. any new partitions assigned and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
>
> I think the change to how the client listener is called should be part of
> this work.
>
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
>
> Thanks,
>
>
> Andy
>
>
>
>
>
>
>
>
> On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Vahid,
> >
> > Thanks for the updates. I think the lack of comments on this KIP suggests
> > that the motivation might need a little work. Here are the two main
> > benefits of this assignor as I see them:
> >
> > 1. It can give a more balanced assignment when subscriptions do not match
> > in a group (this is the same problem solved by KIP-49).
> > 2. It potentially allows applications to save the need to cleanup
> partition
> > state when rebalancing since partitions are more likely to stay assigned
> to
> > the same consumer.
> >
> > Does that seem right to you?
> >
> > I think it's unclear how serious the first problem is. Providing better
> > balance when subscriptions differ is nice, but are rolling updates the
> only
> > scenario where this is encountered? Or are there more general use cases
> > where differing subscriptions could persist for a longer duration? I'm
> also
> > wondering if this assignor addresses the problem found in KAFKA-2019. It
> > would be useful to confirm whether this problem still exists with the new
> > consumer's round robin strategy and how (whether?) it is addressed by
> this
> > assignor.
> >
> > The major selling point seems to be the second point. This is definitely
> > nice to have, but would you expect a lot of value in practice since
> > consumer groups are usually assumed to be stable? It might help to
> describe
> > some specific use cases to help motivate the proposal. One of the
> downsides
> > is that it requires users to restructure their code to get any benefit
> from
> > it. In particular, they need to move partition cleanup out of the
> > onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> > little awkward and will probably make explaining the consumer more
> > difficult. It's probably worth including a discussion of this point in
> the
> > proposal with an example.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com
> > > wrote:
> >
> > > Hi Jason,
> > >
> > > I updated the KIP and added some details about the user data, the
> > > assignment algorithm, and the alternative strategies to consider.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > >
> > > Please let me know if I missed to add something. Thank you.
> > >
> > > Regards,
> > > --Vahid
> > >
> > >
> > >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Andrew Coates <bi...@gmail.com>.
Hi all,
I think sticky assignment is immensely important / useful in many
situations. Apps that use Kafka are many and varied. Any app that stores
any state, either in the form of data from incoming messages, cached
results from previous out-of-process calls or expensive operations, (and
let's face it, that's most!), can see a big negative impact from partition
movement.
The main issue partition movement brings is that it makes building elastic
services very hard. Consider: you've got an app consuming from Kafka that
locally caches data to improve performance. You want the app to auto scale
as the throughout to the topic(s) increases. Currently, when one or more
new instance are added and the group rebalances, all existing instances
have all partitions revoked, and then a new, potentially quite different,
set assigned. An intuitive pattern is to evict partition state, I.e. the
cached data, when a partition is revoked. So in this case all apps flush
their entire cache causing throughput to drop massively, right when you
want to increase it!
Even if the app is not flushing partition state when partitions are
revoked, the lack of a 'sticky' strategy means that a proportion of the
cached state is now useless, and instances have partitions assigned for
which they have no cached state, again negatively impacting throughout.
With a 'sticky' strategy throughput can be maintained and indeed increased,
as intended.
The same is also true in the presence of failure. An instance failing,
(maybe due to high load), can invalidate the caching of existing instances,
negatively impacting throughout of the remaining instances, (possibly at a
time the system needs throughput the most!)
My question would be 'why move partitions if you don't have to?'. I will
certainly be setting the 'sticky' assignment strategy as the default once
it's released, and I have a feeling it will become the default in the
communitie's 'best-practice' guides.
In addition, I think it is important that during a rebalance consumers do
not first have all partitions revoked, only to have a very similar, (or the
same!), set reassigned. This is less than initiative and complicates client
code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
called for true changes in assignment I.e. any new partitions assigned and
any existing ones revoked, when comparing the new assignment to the
previous one.
I think the change to how the client listener is called should be part of
this work.
There is one last scenario I'd like to highlight that I think the KIP
should describe: say you have a group consuming from two topics, each topic
with two partitions. As of 0.9.0.1 the maximum number of consumers you can
have is 2, not 4. With 2 consumers each will get one partition from each
topic. A third consumer with not have any partitions assigned. This should
be fixed by the 'fair' part of the strategy, but it would be good to see
this covered explicitly in the KIP.
Thanks,
Andy
On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup partition
> state when rebalancing since partitions are more likely to stay assigned to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the new
> consumer's round robin strategy and how (whether?) it is addressed by this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to describe
> some specific use cases to help motivate the proposal. One of the downsides
> is that it requires users to restructure their code to get any benefit from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,
I appreciate your feedback.
Please see my comments below, and advise if you have further suggestions.
Thanks.
Regards,
--Vahid
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 06/22/2016 04:41 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Vahid,
Thanks for the updates. I think the lack of comments on this KIP suggests
that the motivation might need a little work. Here are the two main
benefits of this assignor as I see them:
1. It can give a more balanced assignment when subscriptions do not match
in a group (this is the same problem solved by KIP-49).
2. It potentially allows applications to save the need to cleanup
partition
state when rebalancing since partitions are more likely to stay assigned
to
the same consumer.
Does that seem right to you?
Yes, it does. Your summarized it nicely. #1 is an advantage of this
strategy compared to existing round robin and fair strategies.
I think it's unclear how serious the first problem is. Providing better
balance when subscriptions differ is nice, but are rolling updates the
only
scenario where this is encountered? Or are there more general use cases
where differing subscriptions could persist for a longer duration? I'm
also
wondering if this assignor addresses the problem found in KAFKA-2019. It
would be useful to confirm whether this problem still exists with the new
consumer's round robin strategy and how (whether?) it is addressed by this
assignor.
I'm not very clear on the first part of this paragraph. You could clarify
it for me, but in general balancing out the partitions across consumers in
a group as much as possible would normally mean balancing the load within
the cluster, and that's something a user would want to have compared to
cases where the assignments and therefore the load could be quite
unbalanced depending on the subscriptions. Having an optimal balance is
definitely more reassuring that knowing partition assignments could get
quite unbalanced. There is an example in the KIP that explains a simple
use case that leads to an unbalanced assignment with round robin
assignment. This unbalance could become much more severe in real use cases
with many more topics / partitions / consumers, and that's ideally
something we would want to avoid, if possible.
Regarding KAFKA-2019, when I try the simple use case of
https://issues.apache.org/jira/browse/KAFKA-2019?focusedCommentId=14360892&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14360892
each of my consumers gets 3 partitions, which is not the same as what is
mentioned in the comment. I might be missing something in the
configuration (except setting the strategy to 'roundrobin', and fetcher
threads to '2') or the issue may have been resolved already by some other
patch. In any case, the issue based on what I read in the JIRA stems from
multiple threads that each consumer may have and how they threads of each
consumer are assigned first before assigning partitions to other consumer
threads.
Since the new consumer is single threaded there is no such problem in its
round robin strategy. It simply considers consumers one by one for each
partition assignment, and when one consumer is assigned a partition, the
next assignment starts with considering the next consumer in the list (and
not the same consumer that was just assigned). This removes the
possibility of the issue reported in KAFKA-2019 surfacing in the new
consumer. In the sticky strategy we do not have this issue either, since
every time an assignment is about to happen we start with the consumer
with least number of assignments. So we will not have a scenario where a
consumer is repeated assigned partitions as in KAFKA-2019 (unless that
consumer is lagging behind other consumers on the number of partitions
assigned).
The major selling point seems to be the second point. This is definitely
nice to have, but would you expect a lot of value in practice since
consumer groups are usually assumed to be stable? It might help to
describe
some specific use cases to help motivate the proposal. One of the
downsides
is that it requires users to restructure their code to get any benefit
from
it. In particular, they need to move partition cleanup out of the
onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
little awkward and will probably make explaining the consumer more
difficult. It's probably worth including a discussion of this point in the
proposal with an example.
Even though consumer groups are usually stable, it might be the case that
consumers do not initially join the group at the same time. The sticky
strategy in that situation lets those who joined earlier stick to their
partitions to some extent (assuming fairness take precedence over
stickiness). In terms of specific use cases, Andrew touched on examples of
how Kafka can benefit from a sticky assignor. I could add those to the KIP
if you also think they help building the case in favor of sticky assignor.
I agree with you about the downside and I'll make sure I add that to the
KIP as you suggested.
Thanks,
Jason
On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian
<vahidhashemian@us.ibm.com
> wrote:
> Hi Jason,
>
> I updated the KIP and added some details about the user data, the
> assignment algorithm, and the alternative strategies to consider.
>
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> Please let me know if I missed to add something. Thank you.
>
> Regards,
> --Vahid
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,
Thanks for the updates. I think the lack of comments on this KIP suggests
that the motivation might need a little work. Here are the two main
benefits of this assignor as I see them:
1. It can give a more balanced assignment when subscriptions do not match
in a group (this is the same problem solved by KIP-49).
2. It potentially allows applications to save the need to cleanup partition
state when rebalancing since partitions are more likely to stay assigned to
the same consumer.
Does that seem right to you?
I think it's unclear how serious the first problem is. Providing better
balance when subscriptions differ is nice, but are rolling updates the only
scenario where this is encountered? Or are there more general use cases
where differing subscriptions could persist for a longer duration? I'm also
wondering if this assignor addresses the problem found in KAFKA-2019. It
would be useful to confirm whether this problem still exists with the new
consumer's round robin strategy and how (whether?) it is addressed by this
assignor.
The major selling point seems to be the second point. This is definitely
nice to have, but would you expect a lot of value in practice since
consumer groups are usually assumed to be stable? It might help to describe
some specific use cases to help motivate the proposal. One of the downsides
is that it requires users to restructure their code to get any benefit from
it. In particular, they need to move partition cleanup out of the
onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
little awkward and will probably make explaining the consumer more
difficult. It's probably worth including a discussion of this point in the
proposal with an example.
Thanks,
Jason
On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:
> Hi Jason,
>
> I updated the KIP and added some details about the user data, the
> assignment algorithm, and the alternative strategies to consider.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> Please let me know if I missed to add something. Thank you.
>
> Regards,
> --Vahid
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,
I updated the KIP and added some details about the user data, the
assignment algorithm, and the alternative strategies to consider.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
Please let me know if I missed to add something. Thank you.
Regards,
--Vahid
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,
The only thing I added was the specification of the UserData field. The
rest comes from here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol.
See the section on the JoinGroup request.
Generally speaking, I think having fewer assignment strategies included
with Kafka is probably better. One of the advantages of the client-side
assignment approach is that there's no actual need to bundle them into the
release. Applications can use them by depending on a separate library. That
said, sticky assignment seems like a generally good idea and a common need,
so it may be helpful for a lot of users to make it easily available in the
release. If it also addresses the issues raised in KIP-49, then so much the
better.
As for whether we should include both, there I'm not too sure. Most users
probably wouldn't have a strong reason to choose the "fair" assignment over
the "sticky" assignment since they both seem to have the same properties in
terms of balancing the group's partitions. The overhead is a concern for
large groups with many topic subscriptions though, so if people think that
the "fair" approach brings a lot of benefit over round-robin, then it may
be worth including also.
-Jason
On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:
> Hi Jason,
>
> Thanks for reviewing the KIP.
> I will add the details you requested, but to summarize:
>
> Regarding the structure of the user data:
>
> Right now the user data will have the current assignments only which is a
> mapping of consumers to their assigned topic partitions. Is this mapping
> what you're also suggesting with CurrentAssignment field?
> I see how adding a version (as sticky assignor version) will be useful.
> Also how having a protocol name would be useful, perhaps for validation.
> But could you clarify the "Subscription" field and how you think it'll
> come into play?
>
>
> Regarding the algorithm:
>
> There could be similarities between how this KIP is implemented and how
> KIP-49 is handling the fairness. But since we had to take stickiness into
> consideration we started fresh and did not adopt from KIP-49.
> The Sticky assignor implementation is comprehensive and guarantees the
> fairest possible assignment with highest stickiness. I even have a unit
> test that randomly generates an assignment problem and verifies that a
> fair and sticky assignment is calculated.
> KIP-54 gives priority to fairness over stickiness (which makes the
> implementation more complex). We could have another strategy that gives
> priority to stickiness over fairness (which supposedly will have a better
> performance).
> The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates
> the assignment without considering the previous assignments (fairness
> only); whereas for KIP-54 previous assignments play a big role (fairness
> and stickiness).
> I believe if there is a situation where the stickiness requirements do not
> exist it would make sense to use a fair-only assignment without the
> overhead of sticky assignment, as you mentioned.
> So, I could see three different strategies that could enrich assignment
> policy options.
> It would be great to have some feedback from the community about what is
> the best way to move forward with these two KIPs.
>
> In the meantime, I'll add some more details in the KIP about the approach
> for calculating assignments.
>
> Thanks again.
>
> Regards,
> --Vahid
>
>
>
>
> From: Jason Gustafson <ja...@confluent.io>
> To: dev@kafka.apache.org
> Date: 06/06/2016 01:26 PM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hi Vahid,
>
> Can you add some detail to the KIP on the structure of the user data? I'm
> guessing it would be something like this:
>
> ProtocolName => "sticky"
>
> ProtocolMetadata => Version Subscription UserData
> Version => int16
> Subscription => [Topic]
> Topic => string
> UserData => CurrentAssignment
> CurrentAssignment => [Topic [Partition]]
> Topic => string
> Partiton => int32
>
> It would also be helpful to include a little more detail on the algorithm.
> From what I can tell, it looks like you're adopting some of the strategies
> from KIP-49 to handle differing subscriptions better. If so, then I wonder
> if it makes sense to combine the two KIPs? Or do you think there would be
> an advantage to having the "fair" assignment strategy without the overhead
> of the sticky assignor?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Sorry for being late on this thread.
> >
> > The assign() function is auto-triggered during the rebalance by one of
> the
> > consumers when it receives all subscription information collected from
> the
> > server-side coordinator.
> >
> > More details can be found here:
> >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> >
> > As for Kafka Streams, they way it did "stickiness" is by 1) let all
> > consumers put their current assigned topic-partitions and server ids
> into
> > the "metadata" field of the JoinGroupRequest, 2) when the selected
> consumer
> > triggers assign() along with all the subscriptions as well as their
> > metadata, it can parse the metadata to learn about the existing
> assignment
> > map; and hence when making the new assignment it will try to assign
> > partitions to its current owners "with best effort".
> >
> >
> > Hope this helps.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I was looking at the implementation of StreamsPartitionAssignor
> through
> > > its unit tests and expected to find some tests that
> > > - verify stickiness by making at least two calls to the assign()
> method
> > > (so we check the second assign() call output preserves the assignments
> > > coming from the first assign() call output); or
> > > - start off by a preset assignment, call assign() after some
> subscription
> > > change, and verify the previous assignment are preserved.
> > > But none of the methods seem to do these. Did I overlook them, or
> > > stickiness is being tested in some other fashion?
> > >
> > > Also, if there is a high-level write-up about how this assignor works
> > > could you please point me to it? Thanks.
> > >
> > > Regards.
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From: Guozhang Wang <wa...@gmail.com>
> > > To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > Date: 05/02/2016 10:34 AM
> > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already
> doing
> > > some sort of sticky partitioning mechanism. This is done through the
> > > userData field though; i.e. all group members send their current
> > "assigned
> > > partitions" in their join group request, which will be grouped and
> send
> > to
> > > the leader, the leader then does best-effort for sticky-partitioning.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> > ewen@confluent.io>
> > > wrote:
> > >
> > > > I think I'm unclear how we leverage the
> > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > > different
> > > > from our normal usage -- certainly you can use them to generate a
> diff,
> > > but
> > > > you still need to commit when partitions are revoked and that has a
> > > > non-trivial cost. Are we just saying that you might be able to save
> > some
> > > > overhead, e.g. closing/reopening some other resources by doing a
> flush
> > > but
> > > > not a close() or something? You still need to flush any output and
> > > commit
> > > > offsets before returning from onPartitionsRevoked, right? Otherwise
> you
> > > > couldn't guarantee clean handoff of partitions.
> > > >
> > > > In terms of the rebalancing, the basic requirements in the KIP seem
> > > sound.
> > > > Passing previous assignment data via UserData also seems reasonable
> > > since
> > > > it avoids redistributing all assignment data to all members and
> doesn't
> > > > rely on the next generation leader being a member of the current
> > > > generation. Hopefully this shouldn't be surprising since I think I
> > > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > > >
> > > > -Ewen
> > > >
> > > >
> > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > HI Jason,
> > > > >
> > > > > Thanks for your feedback.
> > > > >
> > > > > I believe your suggestion on how to take advantage of this
> assignor
> > is
> > > > > valid. We can leverage onPartitionsRevoked() and
> > > onPartitionsAssigned()
> > > > > callbacks and do a comparison of assigned partitions before and
> after
> > > the
> > > > > re-balance and do the cleanup only if there is a change (e.g., if
> > some
> > > > > previously assigned partition is not in the assignment).
> > > > >
> > > > > On your second question, a number of tests that I ran shows that
> the
> > > old
> > > > > assignments are preserved in the current implementation; except
> for
> > > when
> > > > > the consumer group leader is killed; in which case, a fresh
> > assignment
> > > is
> > > > > performed. This is something that needs to be fixed. I tried to
> use
> > > your
> > > > > pointers to find out where the best place is to preserve the old
> > > > > assignment in such circumstances but have not been able to
> pinpoint
> > > it.
> > > > If
> > > > > you have any suggestion on this please share. Thanks.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > From: Jason Gustafson <ja...@confluent.io>
> > > > > To: dev@kafka.apache.org
> > > > > Date: 04/14/2016 11:37 AM
> > > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > > Strategy
> > > > >
> > > > >
> > > > >
> > > > > Hi Vahid,
> > > > >
> > > > > Thanks for the proposal. I think one of the advantages of having
> > > sticky
> > > > > assignment would be reduce the need to cleanup local partition
> state
> > > > > between rebalances. Do you have any thoughts on how the user would
> > > take
> > > > > advantage of this assignor in the consumer to do this? Maybe one
> > > approach
> > > > > is to delay cleanup until you detect a change from the previous
> > > > assignment
> > > > > in the onPartitionsAssigned() callback?
> > > > >
> > > > > Also, can you provide some detail on how the sticky assignor works
> at
> > > the
> > > > > group protocol level? For example, do you pass old assignments
> > through
> > > > the
> > > > > "UserData" field in the consumer's JoinGroup?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > > vahidhashemian@us.ibm.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have started a new KIP under
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > >
> > > > >
> > > > > > The corresponding JIRA is at
> > > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > > The corresponding PR is at
> > https://github.com/apache/kafka/pull/1020
> > > > > >
> > > > > > Your feedback is much appreciated.
> > > > > >
> > > > > > Regards,
> > > > > > Vahid Hashemian
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,
Thanks for reviewing the KIP.
I will add the details you requested, but to summarize:
Regarding the structure of the user data:
Right now the user data will have the current assignments only which is a
mapping of consumers to their assigned topic partitions. Is this mapping
what you're also suggesting with CurrentAssignment field?
I see how adding a version (as sticky assignor version) will be useful.
Also how having a protocol name would be useful, perhaps for validation.
But could you clarify the "Subscription" field and how you think it'll
come into play?
Regarding the algorithm:
There could be similarities between how this KIP is implemented and how
KIP-49 is handling the fairness. But since we had to take stickiness into
consideration we started fresh and did not adopt from KIP-49.
The Sticky assignor implementation is comprehensive and guarantees the
fairest possible assignment with highest stickiness. I even have a unit
test that randomly generates an assignment problem and verifies that a
fair and sticky assignment is calculated.
KIP-54 gives priority to fairness over stickiness (which makes the
implementation more complex). We could have another strategy that gives
priority to stickiness over fairness (which supposedly will have a better
performance).
The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates
the assignment without considering the previous assignments (fairness
only); whereas for KIP-54 previous assignments play a big role (fairness
and stickiness).
I believe if there is a situation where the stickiness requirements do not
exist it would make sense to use a fair-only assignment without the
overhead of sticky assignment, as you mentioned.
So, I could see three different strategies that could enrich assignment
policy options.
It would be great to have some feedback from the community about what is
the best way to move forward with these two KIPs.
In the meantime, I'll add some more details in the KIP about the approach
for calculating assignments.
Thanks again.
Regards,
--Vahid
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 06/06/2016 01:26 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid,
Can you add some detail to the KIP on the structure of the user data? I'm
guessing it would be something like this:
ProtocolName => "sticky"
ProtocolMetadata => Version Subscription UserData
Version => int16
Subscription => [Topic]
Topic => string
UserData => CurrentAssignment
CurrentAssignment => [Topic [Partition]]
Topic => string
Partiton => int32
It would also be helpful to include a little more detail on the algorithm.
From what I can tell, it looks like you're adopting some of the strategies
from KIP-49 to handle differing subscriptions better. If so, then I wonder
if it makes sense to combine the two KIPs? Or do you think there would be
an advantage to having the "fair" assignment strategy without the overhead
of the sticky assignor?
Thanks,
Jason
On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
> Sorry for being late on this thread.
>
> The assign() function is auto-triggered during the rebalance by one of
the
> consumers when it receives all subscription information collected from
the
> server-side coordinator.
>
> More details can be found here:
>
>
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> As for Kafka Streams, they way it did "stickiness" is by 1) let all
> consumers put their current assigned topic-partitions and server ids
into
> the "metadata" field of the JoinGroupRequest, 2) when the selected
consumer
> triggers assign() along with all the subscriptions as well as their
> metadata, it can parse the metadata to learn about the existing
assignment
> map; and hence when making the new assignment it will try to assign
> partitions to its current owners "with best effort".
>
>
> Hope this helps.
>
>
> Guozhang
>
>
> On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi Guozhang,
> >
> > I was looking at the implementation of StreamsPartitionAssignor
through
> > its unit tests and expected to find some tests that
> > - verify stickiness by making at least two calls to the assign()
method
> > (so we check the second assign() call output preserves the assignments
> > coming from the first assign() call output); or
> > - start off by a preset assignment, call assign() after some
subscription
> > change, and verify the previous assignment are preserved.
> > But none of the methods seem to do these. Did I overlook them, or
> > stickiness is being tested in some other fashion?
> >
> > Also, if there is a high-level write-up about how this assignor works
> > could you please point me to it? Thanks.
> >
> > Regards.
> > --Vahid
> >
> >
> >
> >
> > From: Guozhang Wang <wa...@gmail.com>
> > To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> > Date: 05/02/2016 10:34 AM
> > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
Strategy
> >
> >
> >
> > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already
doing
> > some sort of sticky partitioning mechanism. This is done through the
> > userData field though; i.e. all group members send their current
> "assigned
> > partitions" in their join group request, which will be grouped and
send
> to
> > the leader, the leader then does best-effort for sticky-partitioning.
> >
> >
> > Guozhang
> >
> > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> > > I think I'm unclear how we leverage the
> > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > different
> > > from our normal usage -- certainly you can use them to generate a
diff,
> > but
> > > you still need to commit when partitions are revoked and that has a
> > > non-trivial cost. Are we just saying that you might be able to save
> some
> > > overhead, e.g. closing/reopening some other resources by doing a
flush
> > but
> > > not a close() or something? You still need to flush any output and
> > commit
> > > offsets before returning from onPartitionsRevoked, right? Otherwise
you
> > > couldn't guarantee clean handoff of partitions.
> > >
> > > In terms of the rebalancing, the basic requirements in the KIP seem
> > sound.
> > > Passing previous assignment data via UserData also seems reasonable
> > since
> > > it avoids redistributing all assignment data to all members and
doesn't
> > > rely on the next generation leader being a member of the current
> > > generation. Hopefully this shouldn't be surprising since I think I
> > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > >
> > > -Ewen
> > >
> > >
> > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > HI Jason,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I believe your suggestion on how to take advantage of this
assignor
> is
> > > > valid. We can leverage onPartitionsRevoked() and
> > onPartitionsAssigned()
> > > > callbacks and do a comparison of assigned partitions before and
after
> > the
> > > > re-balance and do the cleanup only if there is a change (e.g., if
> some
> > > > previously assigned partition is not in the assignment).
> > > >
> > > > On your second question, a number of tests that I ran shows that
the
> > old
> > > > assignments are preserved in the current implementation; except
for
> > when
> > > > the consumer group leader is killed; in which case, a fresh
> assignment
> > is
> > > > performed. This is something that needs to be fixed. I tried to
use
> > your
> > > > pointers to find out where the best place is to preserve the old
> > > > assignment in such circumstances but have not been able to
pinpoint
> > it.
> > > If
> > > > you have any suggestion on this please share. Thanks.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > > >
> > > >
> > > > From: Jason Gustafson <ja...@confluent.io>
> > > > To: dev@kafka.apache.org
> > > > Date: 04/14/2016 11:37 AM
> > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > Strategy
> > > >
> > > >
> > > >
> > > > Hi Vahid,
> > > >
> > > > Thanks for the proposal. I think one of the advantages of having
> > sticky
> > > > assignment would be reduce the need to cleanup local partition
state
> > > > between rebalances. Do you have any thoughts on how the user would
> > take
> > > > advantage of this assignor in the consumer to do this? Maybe one
> > approach
> > > > is to delay cleanup until you detect a change from the previous
> > > assignment
> > > > in the onPartitionsAssigned() callback?
> > > >
> > > > Also, can you provide some detail on how the sticky assignor works
at
> > the
> > > > group protocol level? For example, do you pass old assignments
> through
> > > the
> > > > "UserData" field in the consumer's JoinGroup?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have started a new KIP under
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > >
> > > > > The corresponding JIRA is at
> > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > The corresponding PR is at
> https://github.com/apache/kafka/pull/1020
> > > > >
> > > > > Your feedback is much appreciated.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,
Can you add some detail to the KIP on the structure of the user data? I'm
guessing it would be something like this:
ProtocolName => "sticky"
ProtocolMetadata => Version Subscription UserData
Version => int16
Subscription => [Topic]
Topic => string
UserData => CurrentAssignment
CurrentAssignment => [Topic [Partition]]
Topic => string
Partiton => int32
It would also be helpful to include a little more detail on the algorithm.
From what I can tell, it looks like you're adopting some of the strategies
from KIP-49 to handle differing subscriptions better. If so, then I wonder
if it makes sense to combine the two KIPs? Or do you think there would be
an advantage to having the "fair" assignment strategy without the overhead
of the sticky assignor?
Thanks,
Jason
On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
> Sorry for being late on this thread.
>
> The assign() function is auto-triggered during the rebalance by one of the
> consumers when it receives all subscription information collected from the
> server-side coordinator.
>
> More details can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> As for Kafka Streams, they way it did "stickiness" is by 1) let all
> consumers put their current assigned topic-partitions and server ids into
> the "metadata" field of the JoinGroupRequest, 2) when the selected consumer
> triggers assign() along with all the subscriptions as well as their
> metadata, it can parse the metadata to learn about the existing assignment
> map; and hence when making the new assignment it will try to assign
> partitions to its current owners "with best effort".
>
>
> Hope this helps.
>
>
> Guozhang
>
>
> On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi Guozhang,
> >
> > I was looking at the implementation of StreamsPartitionAssignor through
> > its unit tests and expected to find some tests that
> > - verify stickiness by making at least two calls to the assign() method
> > (so we check the second assign() call output preserves the assignments
> > coming from the first assign() call output); or
> > - start off by a preset assignment, call assign() after some subscription
> > change, and verify the previous assignment are preserved.
> > But none of the methods seem to do these. Did I overlook them, or
> > stickiness is being tested in some other fashion?
> >
> > Also, if there is a high-level write-up about how this assignor works
> > could you please point me to it? Thanks.
> >
> > Regards.
> > --Vahid
> >
> >
> >
> >
> > From: Guozhang Wang <wa...@gmail.com>
> > To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> > Date: 05/02/2016 10:34 AM
> > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
> >
> >
> >
> > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
> > some sort of sticky partitioning mechanism. This is done through the
> > userData field though; i.e. all group members send their current
> "assigned
> > partitions" in their join group request, which will be grouped and send
> to
> > the leader, the leader then does best-effort for sticky-partitioning.
> >
> >
> > Guozhang
> >
> > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> > > I think I'm unclear how we leverage the
> > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > different
> > > from our normal usage -- certainly you can use them to generate a diff,
> > but
> > > you still need to commit when partitions are revoked and that has a
> > > non-trivial cost. Are we just saying that you might be able to save
> some
> > > overhead, e.g. closing/reopening some other resources by doing a flush
> > but
> > > not a close() or something? You still need to flush any output and
> > commit
> > > offsets before returning from onPartitionsRevoked, right? Otherwise you
> > > couldn't guarantee clean handoff of partitions.
> > >
> > > In terms of the rebalancing, the basic requirements in the KIP seem
> > sound.
> > > Passing previous assignment data via UserData also seems reasonable
> > since
> > > it avoids redistributing all assignment data to all members and doesn't
> > > rely on the next generation leader being a member of the current
> > > generation. Hopefully this shouldn't be surprising since I think I
> > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > >
> > > -Ewen
> > >
> > >
> > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > HI Jason,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I believe your suggestion on how to take advantage of this assignor
> is
> > > > valid. We can leverage onPartitionsRevoked() and
> > onPartitionsAssigned()
> > > > callbacks and do a comparison of assigned partitions before and after
> > the
> > > > re-balance and do the cleanup only if there is a change (e.g., if
> some
> > > > previously assigned partition is not in the assignment).
> > > >
> > > > On your second question, a number of tests that I ran shows that the
> > old
> > > > assignments are preserved in the current implementation; except for
> > when
> > > > the consumer group leader is killed; in which case, a fresh
> assignment
> > is
> > > > performed. This is something that needs to be fixed. I tried to use
> > your
> > > > pointers to find out where the best place is to preserve the old
> > > > assignment in such circumstances but have not been able to pinpoint
> > it.
> > > If
> > > > you have any suggestion on this please share. Thanks.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > > >
> > > >
> > > > From: Jason Gustafson <ja...@confluent.io>
> > > > To: dev@kafka.apache.org
> > > > Date: 04/14/2016 11:37 AM
> > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > Strategy
> > > >
> > > >
> > > >
> > > > Hi Vahid,
> > > >
> > > > Thanks for the proposal. I think one of the advantages of having
> > sticky
> > > > assignment would be reduce the need to cleanup local partition state
> > > > between rebalances. Do you have any thoughts on how the user would
> > take
> > > > advantage of this assignor in the consumer to do this? Maybe one
> > approach
> > > > is to delay cleanup until you detect a change from the previous
> > > assignment
> > > > in the onPartitionsAssigned() callback?
> > > >
> > > > Also, can you provide some detail on how the sticky assignor works at
> > the
> > > > group protocol level? For example, do you pass old assignments
> through
> > > the
> > > > "UserData" field in the consumer's JoinGroup?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have started a new KIP under
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > >
> > > > > The corresponding JIRA is at
> > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > The corresponding PR is at
> https://github.com/apache/kafka/pull/1020
> > > > >
> > > > > Your feedback is much appreciated.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Guozhang Wang <wa...@gmail.com>.
Sorry for being late on this thread.
The assign() function is auto-triggered during the rebalance by one of the
consumers when it receives all subscription information collected from the
server-side coordinator.
More details can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
As for Kafka Streams, they way it did "stickiness" is by 1) let all
consumers put their current assigned topic-partitions and server ids into
the "metadata" field of the JoinGroupRequest, 2) when the selected consumer
triggers assign() along with all the subscriptions as well as their
metadata, it can parse the metadata to learn about the existing assignment
map; and hence when making the new assignment it will try to assign
partitions to its current owners "with best effort".
Hope this helps.
Guozhang
On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Hi Guozhang,
>
> I was looking at the implementation of StreamsPartitionAssignor through
> its unit tests and expected to find some tests that
> - verify stickiness by making at least two calls to the assign() method
> (so we check the second assign() call output preserves the assignments
> coming from the first assign() call output); or
> - start off by a preset assignment, call assign() after some subscription
> change, and verify the previous assignment are preserved.
> But none of the methods seem to do these. Did I overlook them, or
> stickiness is being tested in some other fashion?
>
> Also, if there is a high-level write-up about how this assignor works
> could you please point me to it? Thanks.
>
> Regards.
> --Vahid
>
>
>
>
> From: Guozhang Wang <wa...@gmail.com>
> To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> Date: 05/02/2016 10:34 AM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
> some sort of sticky partitioning mechanism. This is done through the
> userData field though; i.e. all group members send their current "assigned
> partitions" in their join group request, which will be grouped and send to
> the leader, the leader then does best-effort for sticky-partitioning.
>
>
> Guozhang
>
> On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > I think I'm unclear how we leverage the
> > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> different
> > from our normal usage -- certainly you can use them to generate a diff,
> but
> > you still need to commit when partitions are revoked and that has a
> > non-trivial cost. Are we just saying that you might be able to save some
> > overhead, e.g. closing/reopening some other resources by doing a flush
> but
> > not a close() or something? You still need to flush any output and
> commit
> > offsets before returning from onPartitionsRevoked, right? Otherwise you
> > couldn't guarantee clean handoff of partitions.
> >
> > In terms of the rebalancing, the basic requirements in the KIP seem
> sound.
> > Passing previous assignment data via UserData also seems reasonable
> since
> > it avoids redistributing all assignment data to all members and doesn't
> > rely on the next generation leader being a member of the current
> > generation. Hopefully this shouldn't be surprising since I think I
> > discussed this w/ Jason before he updated the relevant wiki pages :)
> >
> > -Ewen
> >
> >
> > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > HI Jason,
> > >
> > > Thanks for your feedback.
> > >
> > > I believe your suggestion on how to take advantage of this assignor is
> > > valid. We can leverage onPartitionsRevoked() and
> onPartitionsAssigned()
> > > callbacks and do a comparison of assigned partitions before and after
> the
> > > re-balance and do the cleanup only if there is a change (e.g., if some
> > > previously assigned partition is not in the assignment).
> > >
> > > On your second question, a number of tests that I ran shows that the
> old
> > > assignments are preserved in the current implementation; except for
> when
> > > the consumer group leader is killed; in which case, a fresh assignment
> is
> > > performed. This is something that needs to be fixed. I tried to use
> your
> > > pointers to find out where the best place is to preserve the old
> > > assignment in such circumstances but have not been able to pinpoint
> it.
> > If
> > > you have any suggestion on this please share. Thanks.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> > >
> > >
> > > From: Jason Gustafson <ja...@confluent.io>
> > > To: dev@kafka.apache.org
> > > Date: 04/14/2016 11:37 AM
> > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Hi Vahid,
> > >
> > > Thanks for the proposal. I think one of the advantages of having
> sticky
> > > assignment would be reduce the need to cleanup local partition state
> > > between rebalances. Do you have any thoughts on how the user would
> take
> > > advantage of this assignor in the consumer to do this? Maybe one
> approach
> > > is to delay cleanup until you detect a change from the previous
> > assignment
> > > in the onPartitionsAssigned() callback?
> > >
> > > Also, can you provide some detail on how the sticky assignor works at
> the
> > > group protocol level? For example, do you pass old assignments through
> > the
> > > "UserData" field in the consumer's JoinGroup?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have started a new KIP under
> > > >
> > > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > >
> > > > The corresponding JIRA is at
> > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > > >
> > > > Your feedback is much appreciated.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>
>
>
> --
> -- Guozhang
>
>
>
>
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Guozhang,
I was looking at the implementation of StreamsPartitionAssignor through
its unit tests and expected to find some tests that
- verify stickiness by making at least two calls to the assign() method
(so we check the second assign() call output preserves the assignments
coming from the first assign() call output); or
- start off by a preset assignment, call assign() after some subscription
change, and verify the previous assignment are preserved.
But none of the methods seem to do these. Did I overlook them, or
stickiness is being tested in some other fashion?
Also, if there is a high-level write-up about how this assignor works
could you please point me to it? Thanks.
Regards.
--Vahid
From: Guozhang Wang <wa...@gmail.com>
To: "dev@kafka.apache.org" <de...@kafka.apache.org>
Date: 05/02/2016 10:34 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.
Guozhang
On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:
> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's
different
> from our normal usage -- certainly you can use them to generate a diff,
but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush
but
> not a close() or something? You still need to flush any output and
commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem
sound.
> Passing previous assignment data via UserData also seems reasonable
since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and
onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after
the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the
old
> > assignments are preserved in the current implementation; except for
when
> > the consumer group leader is killed; in which case, a fresh assignment
is
> > performed. This is something that needs to be fixed. I tried to use
your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint
it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From: Jason Gustafson <ja...@confluent.io>
> > To: dev@kafka.apache.org
> > Date: 04/14/2016 11:37 AM
> > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having
sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would
take
> > advantage of this assignor in the consumer to do this? Maybe one
approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at
the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Guozhang Wang <wa...@gmail.com>.
Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.
Guozhang
On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:
> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's different
> from our normal usage -- certainly you can use them to generate a diff, but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush but
> not a close() or something? You still need to flush any output and commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem sound.
> Passing previous assignment data via UserData also seems reasonable since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the old
> > assignments are preserved in the current implementation; except for when
> > the consumer group leader is killed; in which case, a fresh assignment is
> > performed. This is something that needs to be fixed. I tried to use your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From: Jason Gustafson <ja...@confluent.io>
> > To: dev@kafka.apache.org
> > Date: 04/14/2016 11:37 AM
> > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would take
> > advantage of this assignor in the consumer to do this? Maybe one approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>
--
-- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Ewen,
Thank you for reviewing the KIP and providing feedback.
I believe the need to commit would still be there, as you mentioned. The
main advantage, however, would be when dealing with local state based on
partitions assigned, as described in
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
or in the corresponding JIRA for this KIP.
If consumers perform some processing on re-assignment of partitions (i.e.
after a rebalance) it would be more efficient for them to stick to their
assigned partitions and reduce the overhead of switching to a new set of
partitions (you also referred to some use cases).
Unfortunately I don't have a specific use case in mind at the moment, but
based on documents like above it seems that consumers can benefit from
such a strategy. If you or others can think of specific use cases to
enrich the KIP please let me know or directly update the KIP.
Regards,
Vahid Hashemian
From: Ewen Cheslack-Postava <ew...@confluent.io>
To: dev@kafka.apache.org
Date: 04/29/2016 09:48 PM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
I think I'm unclear how we leverage the
onPartitionsRevoked/onPartitionsAssigned here in any way that's different
from our normal usage -- certainly you can use them to generate a diff,
but
you still need to commit when partitions are revoked and that has a
non-trivial cost. Are we just saying that you might be able to save some
overhead, e.g. closing/reopening some other resources by doing a flush but
not a close() or something? You still need to flush any output and commit
offsets before returning from onPartitionsRevoked, right? Otherwise you
couldn't guarantee clean handoff of partitions.
In terms of the rebalancing, the basic requirements in the KIP seem sound.
Passing previous assignment data via UserData also seems reasonable since
it avoids redistributing all assignment data to all members and doesn't
rely on the next generation leader being a member of the current
generation. Hopefully this shouldn't be surprising since I think I
discussed this w/ Jason before he updated the relevant wiki pages :)
-Ewen
On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> HI Jason,
>
> Thanks for your feedback.
>
> I believe your suggestion on how to take advantage of this assignor is
> valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> callbacks and do a comparison of assigned partitions before and after
the
> re-balance and do the cleanup only if there is a change (e.g., if some
> previously assigned partition is not in the assignment).
>
> On your second question, a number of tests that I ran shows that the old
> assignments are preserved in the current implementation; except for when
> the consumer group leader is killed; in which case, a fresh assignment
is
> performed. This is something that needs to be fixed. I tried to use your
> pointers to find out where the best place is to preserve the old
> assignment in such circumstances but have not been able to pinpoint it.
If
> you have any suggestion on this please share. Thanks.
>
> Regards,
> Vahid Hashemian
>
>
>
>
> From: Jason Gustafson <ja...@confluent.io>
> To: dev@kafka.apache.org
> Date: 04/14/2016 11:37 AM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment
Strategy
>
>
>
> Hi Vahid,
>
> Thanks for the proposal. I think one of the advantages of having sticky
> assignment would be reduce the need to cleanup local partition state
> between rebalances. Do you have any thoughts on how the user would take
> advantage of this assignor in the consumer to do this? Maybe one
approach
> is to delay cleanup until you detect a change from the previous
assignment
> in the onPartitionsAssigned() callback?
>
> Also, can you provide some detail on how the sticky assignor works at
the
> group protocol level? For example, do you pass old assignments through
the
> "UserData" field in the consumer's JoinGroup?
>
> Thanks,
> Jason
>
> On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi all,
> >
> > I have started a new KIP under
> >
> >
>
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > The corresponding JIRA is at
> > https://issues.apache.org/jira/browse/KAFKA-2273
> > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> >
> > Your feedback is much appreciated.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
>
>
>
>
>
--
Thanks,
Ewen
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
I think I'm unclear how we leverage the
onPartitionsRevoked/onPartitionsAssigned here in any way that's different
from our normal usage -- certainly you can use them to generate a diff, but
you still need to commit when partitions are revoked and that has a
non-trivial cost. Are we just saying that you might be able to save some
overhead, e.g. closing/reopening some other resources by doing a flush but
not a close() or something? You still need to flush any output and commit
offsets before returning from onPartitionsRevoked, right? Otherwise you
couldn't guarantee clean handoff of partitions.
In terms of the rebalancing, the basic requirements in the KIP seem sound.
Passing previous assignment data via UserData also seems reasonable since
it avoids redistributing all assignment data to all members and doesn't
rely on the next generation leader being a member of the current
generation. Hopefully this shouldn't be surprising since I think I
discussed this w/ Jason before he updated the relevant wiki pages :)
-Ewen
On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> HI Jason,
>
> Thanks for your feedback.
>
> I believe your suggestion on how to take advantage of this assignor is
> valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> callbacks and do a comparison of assigned partitions before and after the
> re-balance and do the cleanup only if there is a change (e.g., if some
> previously assigned partition is not in the assignment).
>
> On your second question, a number of tests that I ran shows that the old
> assignments are preserved in the current implementation; except for when
> the consumer group leader is killed; in which case, a fresh assignment is
> performed. This is something that needs to be fixed. I tried to use your
> pointers to find out where the best place is to preserve the old
> assignment in such circumstances but have not been able to pinpoint it. If
> you have any suggestion on this please share. Thanks.
>
> Regards,
> Vahid Hashemian
>
>
>
>
> From: Jason Gustafson <ja...@confluent.io>
> To: dev@kafka.apache.org
> Date: 04/14/2016 11:37 AM
> Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hi Vahid,
>
> Thanks for the proposal. I think one of the advantages of having sticky
> assignment would be reduce the need to cleanup local partition state
> between rebalances. Do you have any thoughts on how the user would take
> advantage of this assignor in the consumer to do this? Maybe one approach
> is to delay cleanup until you detect a change from the previous assignment
> in the onPartitionsAssigned() callback?
>
> Also, can you provide some detail on how the sticky assignor works at the
> group protocol level? For example, do you pass old assignments through the
> "UserData" field in the consumer's JoinGroup?
>
> Thanks,
> Jason
>
> On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi all,
> >
> > I have started a new KIP under
> >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > The corresponding JIRA is at
> > https://issues.apache.org/jira/browse/KAFKA-2273
> > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> >
> > Your feedback is much appreciated.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
>
>
>
>
>
--
Thanks,
Ewen
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Vahid S Hashemian <va...@us.ibm.com>.
HI Jason,
Thanks for your feedback.
I believe your suggestion on how to take advantage of this assignor is
valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
callbacks and do a comparison of assigned partitions before and after the
re-balance and do the cleanup only if there is a change (e.g., if some
previously assigned partition is not in the assignment).
On your second question, a number of tests that I ran shows that the old
assignments are preserved in the current implementation; except for when
the consumer group leader is killed; in which case, a fresh assignment is
performed. This is something that needs to be fixed. I tried to use your
pointers to find out where the best place is to preserve the old
assignment in such circumstances but have not been able to pinpoint it. If
you have any suggestion on this please share. Thanks.
Regards,
Vahid Hashemian
From: Jason Gustafson <ja...@confluent.io>
To: dev@kafka.apache.org
Date: 04/14/2016 11:37 AM
Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid,
Thanks for the proposal. I think one of the advantages of having sticky
assignment would be reduce the need to cleanup local partition state
between rebalances. Do you have any thoughts on how the user would take
advantage of this assignor in the consumer to do this? Maybe one approach
is to delay cleanup until you detect a change from the previous assignment
in the onPartitionsAssigned() callback?
Also, can you provide some detail on how the sticky assignor works at the
group protocol level? For example, do you pass old assignments through the
"UserData" field in the consumer's JoinGroup?
Thanks,
Jason
On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Hi all,
>
> I have started a new KIP under
>
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> The corresponding JIRA is at
> https://issues.apache.org/jira/browse/KAFKA-2273
> The corresponding PR is at https://github.com/apache/kafka/pull/1020
>
> Your feedback is much appreciated.
>
> Regards,
> Vahid Hashemian
>
>
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,
Thanks for the proposal. I think one of the advantages of having sticky
assignment would be reduce the need to cleanup local partition state
between rebalances. Do you have any thoughts on how the user would take
advantage of this assignor in the consumer to do this? Maybe one approach
is to delay cleanup until you detect a change from the previous assignment
in the onPartitionsAssigned() callback?
Also, can you provide some detail on how the sticky assignor works at the
group protocol level? For example, do you pass old assignments through the
"UserData" field in the consumer's JoinGroup?
Thanks,
Jason
On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Hi all,
>
> I have started a new KIP under
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> The corresponding JIRA is at
> https://issues.apache.org/jira/browse/KAFKA-2273
> The corresponding PR is at https://github.com/apache/kafka/pull/1020
>
> Your feedback is much appreciated.
>
> Regards,
> Vahid Hashemian
>
>