You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Vahid S Hashemian <va...@us.ibm.com> on 2016/04/14 20:05:35 UTC

[DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Hi all,

I have started a new KIP under 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
The corresponding JIRA is at 
https://issues.apache.org/jira/browse/KAFKA-2273
The corresponding PR is at https://github.com/apache/kafka/pull/1020

Your feedback is much appreciated.

Regards,
Vahid Hashemian


Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Guozhang,

Thanks for the pointer. I'll try to take a closer look and get a better 
understanding and see if there is anything that can be leveraged for 
KIP-54 implementation.

Regards,
Vahid Hashemian




From:   Guozhang Wang <wa...@gmail.com>
To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
Date:   05/02/2016 10:34 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.


Guozhang

On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's 
different
> from our normal usage -- certainly you can use them to generate a diff, 
but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush 
but
> not a close() or something? You still need to flush any output and 
commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem 
sound.
> Passing previous assignment data via UserData also seems reasonable 
since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and 
onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after 
the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the 
old
> > assignments are preserved in the current implementation; except for 
when
> > the consumer group leader is killed; in which case, a fresh assignment 
is
> > performed. This is something that needs to be fixed. I tried to use 
your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint 
it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From:   Jason Gustafson <ja...@confluent.io>
> > To:     dev@kafka.apache.org
> > Date:   04/14/2016 11:37 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment 
Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having 
sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would 
take
> > advantage of this assignor in the consumer to do this? Maybe one 
approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at 
the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Sorry about my misunderstanding, and thanks for sending the reference.
The grammar you sent is correct; that is how the current assignments are 
preserved in the current implementation.

I understand your point about limiting the policies provided with the 
Kafka release, and the value of providing sticky assignment out of the 
box.
I'm okay with what the community decides in terms of which of these 
options should go into Kafka.
I'll try to document these alternatives in the KIP.

Regards,
--Vahid
 



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   06/06/2016 08:14 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hi Vahid,

The only thing I added was the specification of the UserData field. The
rest comes from here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
.
See the section on the JoinGroup request.

Generally speaking, I think having fewer assignment strategies included
with Kafka is probably better. One of the advantages of the client-side
assignment approach is that there's no actual need to bundle them into the
release. Applications can use them by depending on a separate library. 
That
said, sticky assignment seems like a generally good idea and a common 
need,
so it may be helpful for a lot of users to make it easily available in the
release. If it also addresses the issues raised in KIP-49, then so much 
the
better.

As for whether we should include both, there I'm not too sure. Most users
probably wouldn't have a strong reason to choose the "fair" assignment 
over
the "sticky" assignment since they both seem to have the same properties 
in
terms of balancing the group's partitions. The overhead is a concern for
large groups with many topic subscriptions though, so if people think that
the "fair" approach brings a lot of benefit over round-robin, then it may
be worth including also.

-Jason

On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian 
<vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> Thanks for reviewing the KIP.
> I will add the details you requested, but to summarize:
>
> Regarding the structure of the user data:
>
> Right now the user data will have the current assignments only which is 
a
> mapping of consumers to their assigned topic partitions. Is this mapping
> what you're also suggesting with CurrentAssignment field?
> I see how adding a version (as sticky assignor version) will be useful.
> Also how having a protocol name would be useful, perhaps for validation.
> But could you clarify the "Subscription" field and how you think it'll
> come into play?
>
>
> Regarding the algorithm:
>
> There could be similarities between how this KIP is implemented and how
> KIP-49 is handling the fairness. But since we had to take stickiness 
into
> consideration we started fresh and did not adopt from KIP-49.
> The Sticky assignor implementation is comprehensive and guarantees the
> fairest possible assignment with highest stickiness. I even have a unit
> test that randomly generates an assignment problem and verifies that a
> fair and sticky assignment is calculated.
> KIP-54 gives priority to fairness over stickiness (which makes the
> implementation more complex). We could have another strategy that gives
> priority to stickiness over fairness (which supposedly will have a 
better
> performance).
> The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates
> the assignment without considering the previous assignments (fairness
> only); whereas for KIP-54 previous assignments play a big role (fairness
> and stickiness).
> I believe if there is a situation where the stickiness requirements do 
not
> exist it would make sense to use a fair-only assignment without the
> overhead of sticky assignment, as you mentioned.
> So, I could see three different strategies that could enrich assignment
> policy options.
> It would be great to have some feedback from the community about what is
> the best way to move forward with these two KIPs.
>
> In the meantime, I'll add some more details in the KIP about the 
approach
> for calculating assignments.
>
> Thanks again.
>
> Regards,
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   06/06/2016 01:26 PM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment 
Strategy
>
>
>
> Hi Vahid,
>
> Can you add some detail to the KIP on the structure of the user data? 
I'm
> guessing it would be something like this:
>
> ProtocolName => "sticky"
>
> ProtocolMetadata => Version Subscription UserData
>   Version => int16
>   Subscription => [Topic]
>     Topic => string
>   UserData => CurrentAssignment
>     CurrentAssignment => [Topic [Partition]]
>       Topic => string
>       Partiton => int32
>
> It would also be helpful to include a little more detail on the 
algorithm.
> From what I can tell, it looks like you're adopting some of the 
strategies
> from KIP-49 to handle differing subscriptions better. If so, then I 
wonder
> if it makes sense to combine the two KIPs? Or do you think there would 
be
> an advantage to having the "fair" assignment strategy without the 
overhead
> of the sticky assignor?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> 
wrote:
>
> > Sorry for being late on this thread.
> >
> > The assign() function is auto-triggered during the rebalance by one of
> the
> > consumers when it receives all subscription information collected from
> the
> > server-side coordinator.
> >
> > More details can be found here:
> >
> >
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol

>
> >
> > As for Kafka Streams, they way it did "stickiness" is by 1) let all
> > consumers put their current assigned topic-partitions and server ids
> into
> > the "metadata" field of the JoinGroupRequest, 2) when the selected
> consumer
> > triggers assign() along with all the subscriptions as well as their
> > metadata, it can parse the metadata to learn about the existing
> assignment
> > map; and hence when making the new assignment it will try to assign
> > partitions to its current owners "with best effort".
> >
> >
> > Hope this helps.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I was looking at the implementation of StreamsPartitionAssignor
> through
> > > its unit tests and expected to find some tests that
> > > - verify stickiness by making at least two calls to the assign()
> method
> > > (so we check the second assign() call output preserves the 
assignments
> > > coming from the first assign() call output); or
> > > - start off by a preset assignment, call assign() after some
> subscription
> > > change, and verify the previous assignment are preserved.
> > > But none of the methods seem to do these. Did I overlook them, or
> > > stickiness is being tested in some other fashion?
> > >
> > > Also, if there is a high-level write-up about how this assignor 
works
> > > could you please point me to it? Thanks.
> > >
> > > Regards.
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From:   Guozhang Wang <wa...@gmail.com>
> > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > Date:   05/02/2016 10:34 AM
> > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already
> doing
> > > some sort of sticky partitioning mechanism. This is done through the
> > > userData field though; i.e. all group members send their current
> > "assigned
> > > partitions" in their join group request, which will be grouped and
> send
> > to
> > > the leader, the leader then does best-effort for 
sticky-partitioning.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> > ewen@confluent.io>
> > > wrote:
> > >
> > > > I think I'm unclear how we leverage the
> > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > > different
> > > > from our normal usage -- certainly you can use them to generate a
> diff,
> > > but
> > > > you still need to commit when partitions are revoked and that has 
a
> > > > non-trivial cost. Are we just saying that you might be able to 
save
> > some
> > > > overhead, e.g. closing/reopening some other resources by doing a
> flush
> > > but
> > > > not a close() or something? You still need to flush any output and
> > > commit
> > > > offsets before returning from onPartitionsRevoked, right? 
Otherwise
> you
> > > > couldn't guarantee clean handoff of partitions.
> > > >
> > > > In terms of the rebalancing, the basic requirements in the KIP 
seem
> > > sound.
> > > > Passing previous assignment data via UserData also seems 
reasonable
> > > since
> > > > it avoids redistributing all assignment data to all members and
> doesn't
> > > > rely on the next generation leader being a member of the current
> > > > generation. Hopefully this shouldn't be surprising since I think I
> > > > discussed this w/ Jason before he updated the relevant wiki pages 
:)
> > > >
> > > > -Ewen
> > > >
> > > >
> > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > HI Jason,
> > > > >
> > > > > Thanks for your feedback.
> > > > >
> > > > > I believe your suggestion on how to take advantage of this
> assignor
> > is
> > > > > valid. We can leverage onPartitionsRevoked() and
> > > onPartitionsAssigned()
> > > > > callbacks and do a comparison of assigned partitions before and
> after
> > > the
> > > > > re-balance and do the cleanup only if there is a change (e.g., 
if
> > some
> > > > > previously assigned partition is not in the assignment).
> > > > >
> > > > > On your second question, a number of tests that I ran shows that
> the
> > > old
> > > > > assignments are preserved in the current implementation; except
> for
> > > when
> > > > > the consumer group leader is killed; in which case, a fresh
> > assignment
> > > is
> > > > > performed. This is something that needs to be fixed. I tried to
> use
> > > your
> > > > > pointers to find out where the best place is to preserve the old
> > > > > assignment in such circumstances but have not been able to
> pinpoint
> > > it.
> > > > If
> > > > > you have any suggestion on this please share. Thanks.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > From:   Jason Gustafson <ja...@confluent.io>
> > > > > To:     dev@kafka.apache.org
> > > > > Date:   04/14/2016 11:37 AM
> > > > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > > Strategy
> > > > >
> > > > >
> > > > >
> > > > > Hi Vahid,
> > > > >
> > > > > Thanks for the proposal. I think one of the advantages of having
> > > sticky
> > > > > assignment would be reduce the need to cleanup local partition
> state
> > > > > between rebalances. Do you have any thoughts on how the user 
would
> > > take
> > > > > advantage of this assignor in the consumer to do this? Maybe one
> > > approach
> > > > > is to delay cleanup until you detect a change from the previous
> > > > assignment
> > > > > in the onPartitionsAssigned() callback?
> > > > >
> > > > > Also, can you provide some detail on how the sticky assignor 
works
> at
> > > the
> > > > > group protocol level? For example, do you pass old assignments
> > through
> > > > the
> > > > > "UserData" field in the consumer's JoinGroup?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > > vahidhashemian@us.ibm.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have started a new KIP under
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

>
> > >
> > > > >
> > > > > > The corresponding JIRA is at
> > > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > > The corresponding PR is at
> > https://github.com/apache/kafka/pull/1020
> > > > > >
> > > > > > Your feedback is much appreciated.
> > > > > >
> > > > > > Regards,
> > > > > > Vahid Hashemian
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
>
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thanks Andrew for your feedback and interest on this feature.

If there is no further feedback on this KIP (and no objection) I'll start 
the voting process soon.

Thanks.
--Vahid




From:   Andrew Coates <bi...@gmail.com>
To:     dev@kafka.apache.org
Date:   08/10/2016 12:38 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



I'm still very interested in seeing this KIP progress ...
On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian <va...@us.ibm.com>
wrote:

> I would like to revive this thread and ask for additional feedback on 
this
> KIP.
>
> There have already been some feedback, mostly in favor, plus some 
concern
> about the value gain considering the complexity and the semantics; i.e.
> how the eventually revoked assignments need to be processed in the
> onPartitionsAssigned() callback, and not in onPartitionsRevoked().
>
> If it helps, I could also send a note to users mailing list about this 
KIP
> and ask for their feedback.
> I could also put the KIP up for a vote if that is expected at this 
point.
>
> Thanks.
> --Vahid
>
>
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Andrew Coates <bi...@gmail.com>.
I'm still very interested in seeing this KIP progress ...
On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian <va...@us.ibm.com>
wrote:

> I would like to revive this thread and ask for additional feedback on this
> KIP.
>
> There have already been some feedback, mostly in favor, plus some concern
> about the value gain considering the complexity and the semantics; i.e.
> how the eventually revoked assignments need to be processed in the
> onPartitionsAssigned() callback, and not in onPartitionsRevoked().
>
> If it helps, I could also send a note to users mailing list about this KIP
> and ask for their feedback.
> I could also put the KIP up for a vote if that is expected at this point.
>
> Thanks.
> --Vahid
>
>
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
I would like to revive this thread and ask for additional feedback on this 
KIP.

There have already been some feedback, mostly in favor, plus some concern 
about the value gain considering the complexity and the semantics; i.e. 
how the eventually revoked assignments need to be processed in the 
onPartitionsAssigned() callback, and not in onPartitionsRevoked().

If it helps, I could also send a note to users mailing list about this KIP 
and ask for their feedback.
I could also put the KIP up for a vote if that is expected at this point.

Thanks.
--Vahid



Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Onur,

Your understanding is correct.
If a consumer dies and later comes back, with the current proposal, there 
is no guarantee that it would reclaim its previous assignment.
 
Regards,
--Vahid
 



From:   Onur Karaman <ok...@linkedin.com.INVALID>
To:     dev@kafka.apache.org
Date:   06/23/2016 01:03 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



From what I understood, it seems that stickiness is preserved only for the
remaining live consumers.

Say a consumer owns some partitions and then dies. Those partitions will
get redistributed to the rest of the group.

Now if the consumer comes back up, based on the algorithm described with
the concept of "reassignable partitions", then the consumer may get
different partitions than what it had before. Is my understanding right?

Put another way: once coming back up, can the consumer load its UserData
with the assignment it had before dying?


On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson <ja...@confluent.io> 
wrote:

> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP 
suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not 
match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup 
partition
> state when rebalancing since partitions are more likely to stay assigned 
to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the 
only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm 
also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the 
new
> consumer's round robin strategy and how (whether?) it is addressed by 
this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to 
describe
> some specific use cases to help motivate the proposal. One of the 
downsides
> is that it requires users to restructure their code to get any benefit 
from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is 
a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in 
the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Onur Karaman <ok...@linkedin.com.INVALID>.
From what I understood, it seems that stickiness is preserved only for the
remaining live consumers.

Say a consumer owns some partitions and then dies. Those partitions will
get redistributed to the rest of the group.

Now if the consumer comes back up, based on the algorithm described with
the concept of "reassignable partitions", then the consumer may get
different partitions than what it had before. Is my understanding right?

Put another way: once coming back up, can the consumer load its UserData
with the assignment it had before dying?


On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup partition
> state when rebalancing since partitions are more likely to stay assigned to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the new
> consumer's round robin strategy and how (whether?) it is addressed by this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to describe
> some specific use cases to help motivate the proposal. One of the downsides
> is that it requires users to restructure their code to get any benefit from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks for the thoughtful comments.
Please see my response below.

BTW, I have been trying to update the KIP with some of the recent 
discussions on the mailing list.

Regards,
--Vahid
 



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   06/27/2016 12:53 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hey Vahid,

Comments below:


I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers 
in
> a group as much as possible would normally mean balancing the load 
within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.


I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have 
the
same subscription, in which case the balance provided by round robin 
should
be even (in terms of the number of assigned partitions). The only case 
that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.


I agree that if we care only about a balanced assignment with same 
subscriptions the round robin assignment is a good choice. But if we bring 
in stickiness to the mix it won't be guaranteed by the round robin 
assignor. An example (as Andrew mentioned in his earlier note) is elastic 
consumers that come and go automatically depending on the load and how 
much they lag behind. If these consumer maintain state of the partitions 
they consume from it would be reasonable to want them to stick to their 
assigned partitions, rather than having to repeat partition cleanup every 
time the number of consumers changes due to an increase or decrease in 
load. 

I'll also think about it and let you know if I come up with a use case 
with differing subscriptions. If differing subscriptions turns out not to 
be a common use case, the design and implementation of the sticky assignor 
could be modified to a far less complex setting so that 
fairness/stickiness can be guaranteed for same subscriptions. As I 
mentioned before, the current design / implementation is comprehensive and 
can be tweaked towards a less complex solution if further assumptions can 
be made.


Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list 
(and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).


Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could 
actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is 
not
needed to deal with this problem.


That's correct, and thanks for going into the issue in more details.


Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples 
of
> how Kafka can benefit from a sticky assignor. I could add those to the 
KIP
> if you also think they help building the case in favor of sticky 
assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.


Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this to users and may require them to change their usage pattern a
little bit. I think we expect users to do something like the following in
their rebalance listener:

class MyRebalanceListener {
  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

This is fairly intuitive, but if you use this pattern, then sticky
assignment doesn't give you anything because you always cleanup state 
prior
to the rebalance. Instead you need to do something like this:

class MyRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      commitOffsets(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> assignment) {
    for (TopicPartition partition : difference(lastAssignment, assignment) 
{
      cleanupState(partition);
    }

    for (TopicPartition partition : difference(assignment, lastAssignment) 
{
      initializeState(partition);
    }

    for (TopicPartition partition : assignment) {
      initializeOffset(partition);
    }

    this.lastAssignment = assignment;
  }
}

This seems harder to explain and probably is the reason why Andy was
suggesting that it would be more ideal if we could simply skip the call to
onRevoked() if the partitions remain assigned to the consumer after the
rebalance. Unfortunately, the need to commit offsets prior to rebalancing
makes this tricky. The other option suggested by Andy would be to 
introduce
a third method in the rebalance listener (e.g. 
doOffsetCommit(partitions)).
Then the consumer would call doOffsetCommit() prior to every rebalance, 
but
only invoke onPartitionsRevoked() when partitions have actually been
assigned to another consumer following the rebalance. Either way, we're
making the API more complex, which would be nice to avoid unless really
necessary.

Thanks for the code snippets. They look good and understandable given the 
current callback listeners design.
I agree that with an additional callback as Andy suggested things would be 
easier to justify and explain. As you mentioned, it's a matter of whether 
we want the additional complexity that comes with it.


Overall, I think my feeling at the moment is that the sticky assignor is a
nice improvement over the currently available assignors, but the gain 
seems
a little marginal and maybe not worth the cost of the complexity mentioned
above. It's not a strong feeling though and it would be nice to hear what
others think. The other thing worth mentioning is that we've talked a few
times in the past about the concept of "partial rebalancing," which would
allow the group to reassign only a subset of the partitions it was
consuming. This would let part of the group continue consuming while the
group is rebalancing. We don't have any proposals ready to support this,
but if we want to have this long term, then it might reduce some of the
benefit provided by the sticky assignor.


Understood, and thanks for sharing your concerns and feedback. I hope we 
can get more feedback from the community on whether a sticky partition 
assignment strategy in any form is beneficial to Kafka.


Thanks,
Jason





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

Comments below:

I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers in
> a group as much as possible would normally mean balancing the load within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.


I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have the
same subscription, in which case the balance provided by round robin should
be even (in terms of the number of assigned partitions). The only case that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.

Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list (and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).


Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is not
needed to deal with this problem.

Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples of
> how Kafka can benefit from a sticky assignor. I could add those to the KIP
> if you also think they help building the case in favor of sticky assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.


Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this to users and may require them to change their usage pattern a
little bit. I think we expect users to do something like the following in
their rebalance listener:

class MyRebalanceListener {
  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

This is fairly intuitive, but if you use this pattern, then sticky
assignment doesn't give you anything because you always cleanup state prior
to the rebalance. Instead you need to do something like this:

class MyRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      commitOffsets(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> assignment) {
    for (TopicPartition partition : difference(lastAssignment, assignment) {
      cleanupState(partition);
    }

    for (TopicPartition partition : difference(assignment, lastAssignment) {
      initializeState(partition);
    }

    for (TopicPartition partition : assignment) {
      initializeOffset(partition);
    }

    this.lastAssignment = assignment;
  }
}

This seems harder to explain and probably is the reason why Andy was
suggesting that it would be more ideal if we could simply skip the call to
onRevoked() if the partitions remain assigned to the consumer after the
rebalance. Unfortunately, the need to commit offsets prior to rebalancing
makes this tricky. The other option suggested by Andy would be to introduce
a third method in the rebalance listener (e.g. doOffsetCommit(partitions)).
Then the consumer would call doOffsetCommit() prior to every rebalance, but
only invoke onPartitionsRevoked() when partitions have actually been
assigned to another consumer following the rebalance. Either way, we're
making the API more complex, which would be nice to avoid unless really
necessary.

Overall, I think my feeling at the moment is that the sticky assignor is a
nice improvement over the currently available assignors, but the gain seems
a little marginal and maybe not worth the cost of the complexity mentioned
above. It's not a strong feeling though and it would be nice to hear what
others think. The other thing worth mentioning is that we've talked a few
times in the past about the concept of "partial rebalancing," which would
allow the group to reassign only a subset of the partitions it was
consuming. This would let part of the group continue consuming while the
group is rebalancing. We don't have any proposals ready to support this,
but if we want to have this long term, then it might reduce some of the
benefit provided by the sticky assignor.

Thanks,
Jason


On Thu, Jun 23, 2016 at 5:04 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Thank you Andy for your feedback on the KIP.
>
> I agree with Jason on the responses he provided below.
>
> If we give precedence to fairness over stickiness there is no assumption
> that can be made about which assignment would remain and which would be
> revoked.
> If we give precedence to stickiness over fairness, we can be sure that all
> existing valid assignments (those with their topic partition still valid)
> would remain.
>
> I'll add your example to the KIP, but this is how it should work with
> sticky assignor:
>
> We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions.
> Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two
> consumers are subscribed to both t0 and t1.
> The assignment using the stick assignor will be:
>  * C0: [t0p0, t1p0]
>  * C1: [t0p1, t1p1]
>
> Now if we add C2 (subscribed to both topics), this is what we get:
>  * C0: [t1p0]
>  * C1: [t0p1, t1p1]
>  * C2: [t0p0]
>
> I think both range and round robin assignors would produce this:
>  * C0: [t0p0, t1p1]
>  * C1: [t0p1]
>  * C2: [t1p0]
>
> Regards,
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   06/23/2016 10:06 AM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hey Andy,
>
> Thanks for jumping in. A couple comments:
>
> In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
>
>
> The problem is that the revocation callback is called before you know what
> the assignment for the next generation will be. This is necessary for the
> consumer to be able to commit offsets for its assigned partitions. Once
> the
> consumer has a new assignment, it is no longer safe to commit offsets from
> the previous generation. Unless sticky assignment can give us some
> guarantee on which partitions will remain after the rebalance, all of them
> must be included in the revocation callback.
>
>
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
>
>
> This would be true for range assignment, but with 4 partitions total,
> round-robin assignment would give one partition to each of the 4 consumers
> (assuming subscriptions match).
>
> Thanks,
> Jason
>
>
> On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I think sticky assignment is immensely important / useful in many
> > situations. Apps that use Kafka are many and varied. Any app that stores
> > any state, either in the form of data from incoming messages, cached
> > results from previous out-of-process calls or expensive operations, (and
> > let's face it, that's most!), can see a big negative impact from
> partition
> > movement.
> >
> > The main issue partition movement brings is that it makes building
> elastic
> > services very hard. Consider: you've got an app consuming from Kafka
> that
> > locally caches data to improve performance. You want the app to auto
> scale
> > as the throughout to the topic(s) increases. Currently,   when one or
> more
> > new instance are added and the group rebalances, all existing instances
> > have all partitions revoked, and then a new, potentially quite
> different,
> > set assigned. An intuitive pattern is to evict partition state, I.e. the
> > cached data, when a partition is revoked. So in this case all apps flush
> > their entire cache causing throughput to drop massively, right when you
> > want to increase it!
> >
> > Even if the app is not flushing partition state when partitions are
> > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > cached state is now useless, and instances have partitions assigned for
> > which they have no cached state, again negatively impacting throughout.
> >
> > With a 'sticky' strategy throughput can be maintained and indeed
> increased,
> > as intended.
> >
> > The same is also true in the presence of failure. An instance failing,
> > (maybe due to high load), can invalidate the caching of existing
> instances,
> > negatively impacting throughout of the remaining instances, (possibly at
> a
> > time the system needs throughput the most!)
> >
> > My question would be 'why move partitions if you don't have to?'. I will
> > certainly be setting the 'sticky' assignment strategy as the default
> once
> > it's released, and I have a feeling it will become the default in the
> > communitie's 'best-practice' guides.
> >
> > In addition, I think it is important that during a rebalance consumers
> do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
> >
> > I think the change to how the client listener is called should be part
> of
> > this work.
> >
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
> >
> > Thanks,
> >
> >
> > Andy
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
> >
> > > Hey Vahid,
> > >
> > > Thanks for the updates. I think the lack of comments on this KIP
> suggests
> > > that the motivation might need a little work. Here are the two main
> > > benefits of this assignor as I see them:
> > >
> > > 1. It can give a more balanced assignment when subscriptions do not
> match
> > > in a group (this is the same problem solved by KIP-49).
> > > 2. It potentially allows applications to save the need to cleanup
> > partition
> > > state when rebalancing since partitions are more likely to stay
> assigned
> > to
> > > the same consumer.
> > >
> > > Does that seem right to you?
> > >
> > > I think it's unclear how serious the first problem is. Providing
> better
> > > balance when subscriptions differ is nice, but are rolling updates the
> > only
> > > scenario where this is encountered? Or are there more general use
> cases
> > > where differing subscriptions could persist for a longer duration? I'm
> > also
> > > wondering if this assignor addresses the problem found in KAFKA-2019.
> It
> > > would be useful to confirm whether this problem still exists with the
> new
> > > consumer's round robin strategy and how (whether?) it is addressed by
> > this
> > > assignor.
> > >
> > > The major selling point seems to be the second point. This is
> definitely
> > > nice to have, but would you expect a lot of value in practice since
> > > consumer groups are usually assumed to be stable? It might help to
> > describe
> > > some specific use cases to help motivate the proposal. One of the
> > downsides
> > > is that it requires users to restructure their code to get any benefit
> > from
> > > it. In particular, they need to move partition cleanup out of the
> > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> is a
> > > little awkward and will probably make explaining the consumer more
> > > difficult. It's probably worth including a discussion of this point in
> > the
> > > proposal with an example.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com
> > > > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > I updated the KIP and added some details about the user data, the
> > > > assignment algorithm, and the alternative strategies to consider.
> > > >
> > > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > > >
> > > > Please let me know if I missed to add something. Thank you.
> > > >
> > > > Regards,
> > > > --Vahid
> > > >
> > > >
> > > >
> > >
> >
>
>
>
>
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thank you Andy for your feedback on the KIP.

I agree with Jason on the responses he provided below.

If we give precedence to fairness over stickiness there is no assumption 
that can be made about which assignment would remain and which would be 
revoked.
If we give precedence to stickiness over fairness, we can be sure that all 
existing valid assignments (those with their topic partition still valid) 
would remain.

I'll add your example to the KIP, but this is how it should work with 
sticky assignor:

We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions. 
Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two 
consumers are subscribed to both t0 and t1.
The assignment using the stick assignor will be:
 * C0: [t0p0, t1p0]
 * C1: [t0p1, t1p1]

Now if we add C2 (subscribed to both topics), this is what we get:
 * C0: [t1p0]
 * C1: [t0p1, t1p1]
 * C2: [t0p0]

I think both range and round robin assignors would produce this:
 * C0: [t0p0, t1p1]
 * C1: [t0p1]
 * C2: [t1p0]
 
Regards,
--Vahid




From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   06/23/2016 10:06 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hey Andy,

Thanks for jumping in. A couple comments:

In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or 
the
> same!), set reassigned. This is less than initiative and complicates 
client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only 
be
> called for true changes in assignment I.e. any new partitions assigned 
and
> any existing ones revoked, when comparing the new assignment to the
> previous one.


The problem is that the revocation callback is called before you know what
the assignment for the next generation will be. This is necessary for the
consumer to be able to commit offsets for its assigned partitions. Once 
the
consumer has a new assignment, it is no longer safe to commit offsets from
the previous generation. Unless sticky assignment can give us some
guarantee on which partitions will remain after the rebalance, all of them
must be included in the revocation callback.


> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each 
topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you 
can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This 
should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.


This would be true for range assignment, but with 4 partitions total,
round-robin assignment would give one partition to each of the 4 consumers
(assuming subscriptions match).

Thanks,
Jason


On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
wrote:

> Hi all,
>
> I think sticky assignment is immensely important / useful in many
> situations. Apps that use Kafka are many and varied. Any app that stores
> any state, either in the form of data from incoming messages, cached
> results from previous out-of-process calls or expensive operations, (and
> let's face it, that's most!), can see a big negative impact from 
partition
> movement.
>
> The main issue partition movement brings is that it makes building 
elastic
> services very hard. Consider: you've got an app consuming from Kafka 
that
> locally caches data to improve performance. You want the app to auto 
scale
> as the throughout to the topic(s) increases. Currently,   when one or 
more
> new instance are added and the group rebalances, all existing instances
> have all partitions revoked, and then a new, potentially quite 
different,
> set assigned. An intuitive pattern is to evict partition state, I.e. the
> cached data, when a partition is revoked. So in this case all apps flush
> their entire cache causing throughput to drop massively, right when you
> want to increase it!
>
> Even if the app is not flushing partition state when partitions are
> revoked, the lack of a 'sticky' strategy means that a proportion of the
> cached state is now useless, and instances have partitions assigned for
> which they have no cached state, again negatively impacting throughout.
>
> With a 'sticky' strategy throughput can be maintained and indeed 
increased,
> as intended.
>
> The same is also true in the presence of failure. An instance failing,
> (maybe due to high load), can invalidate the caching of existing 
instances,
> negatively impacting throughout of the remaining instances, (possibly at 
a
> time the system needs throughput the most!)
>
> My question would be 'why move partitions if you don't have to?'. I will
> certainly be setting the 'sticky' assignment strategy as the default 
once
> it's released, and I have a feeling it will become the default in the
> communitie's 'best-practice' guides.
>
> In addition, I think it is important that during a rebalance consumers 
do
> not first have all partitions revoked, only to have a very similar, (or 
the
> same!), set reassigned. This is less than initiative and complicates 
client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only 
be
> called for true changes in assignment I.e. any new partitions assigned 
and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
>
> I think the change to how the client listener is called should be part 
of
> this work.
>
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each 
topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you 
can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This 
should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
>
> Thanks,
>
>
> Andy
>
>
>
>
>
>
>
>
> On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Vahid,
> >
> > Thanks for the updates. I think the lack of comments on this KIP 
suggests
> > that the motivation might need a little work. Here are the two main
> > benefits of this assignor as I see them:
> >
> > 1. It can give a more balanced assignment when subscriptions do not 
match
> > in a group (this is the same problem solved by KIP-49).
> > 2. It potentially allows applications to save the need to cleanup
> partition
> > state when rebalancing since partitions are more likely to stay 
assigned
> to
> > the same consumer.
> >
> > Does that seem right to you?
> >
> > I think it's unclear how serious the first problem is. Providing 
better
> > balance when subscriptions differ is nice, but are rolling updates the
> only
> > scenario where this is encountered? Or are there more general use 
cases
> > where differing subscriptions could persist for a longer duration? I'm
> also
> > wondering if this assignor addresses the problem found in KAFKA-2019. 
It
> > would be useful to confirm whether this problem still exists with the 
new
> > consumer's round robin strategy and how (whether?) it is addressed by
> this
> > assignor.
> >
> > The major selling point seems to be the second point. This is 
definitely
> > nice to have, but would you expect a lot of value in practice since
> > consumer groups are usually assumed to be stable? It might help to
> describe
> > some specific use cases to help motivate the proposal. One of the
> downsides
> > is that it requires users to restructure their code to get any benefit
> from
> > it. In particular, they need to move partition cleanup out of the
> > onPartitionsRevoked() callback and into onPartitionsAssigned(). This 
is a
> > little awkward and will probably make explaining the consumer more
> > difficult. It's probably worth including a discussion of this point in
> the
> > proposal with an example.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com
> > > wrote:
> >
> > > Hi Jason,
> > >
> > > I updated the KIP and added some details about the user data, the
> > > assignment algorithm, and the alternative strategies to consider.
> > >
> > >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> > >
> > > Please let me know if I missed to add something. Thank you.
> > >
> > > Regards,
> > > --Vahid
> > >
> > >
> > >
> >
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Gouzhang,

Thanks for the reference.
A similar question was asked earlier about whether, with sticky assignor, 
consumers stick to their previous partitions if they die and come back 
later.
Currently the sticky assignor does not support that because it only 
preserves only the last assignment before the rebalance.
If a consumer dies and comes back during different rebalance intervals 
there is no guarantee it would gets its previous partitions.
If the community sees this as an important requirement for the sticky 
assignor we can definitely include it in the KIP.

Regards,
-----------------------------------------------------------------

Vahid Hashemian, Ph.D.
Advisory Software Engineer, IBM Cloud
Email: vahidhashemian@us.ibm.com
Phone: 1-408-463-2380
 
IBM Silicon Valley Lab
555 Bailey Ave.
San Jose, CA 95141


 



From:   Guozhang Wang <wa...@gmail.com>
To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
Date:   06/23/2016 03:28 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Just adding some related reference here:

Henry Cai is contributing some advanced feature in Kafka Streams regarding
static assignment: https://github.com/apache/kafka/pull/1543

The main motivation is that when you do rolling bounce for upgrading your
Kafka Streams code, for example, you would prefer to not move assigned
partitions of the current bouncing instance to others, and today it is
worked around by increasing the session.timeout; but what is more tricky 
is
that when the bouncing instance comes back, it will still trigger a
rebalance. The idea is that as long as we can encode the previous
iteration's assignment map, and we can check that the list of partitions /
members does not change regarding to their previous assigned partitions, 
we
keep the assigned as is.

Guozhang


On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates 
<bi...@gmail.com>
wrote:

> Hey Jason,
>
> Good to know on the round robin assignment. I'll look into that.
>
> The issue I have with the current rebalance listener is that it's not
> intuitive and unnecessarily exposes the inner workings of rebalance 
logic.
> When the onPartitionsRevoked method is called it's not really saying the
> partitions were revoked. It's really saying a rebalance is happening and
> you need to deal with any in-flight partitions & commit offsets. So 
maybe
> the method name is wrong! Maybe it should be 'onRebalance' or
> 'commitOffsets'..?  Then the interface could also have an
> onPartitionsRevoked method that is only called when partitions have been
> revoked and given to someone else to handle, rather than just kind of
> paused while we rebalance... maybe the new method could be
> onPausePartitions?
>
> Andy
>
> On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Andy,
> >
> > Thanks for jumping in. A couple comments:
> >
> > In addition, I think it is important that during a rebalance consumers 
do
> > > not first have all partitions revoked, only to have a very similar, 
(or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions 
assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> >
> >
> > The problem is that the revocation callback is called before you know
> what
> > the assignment for the next generation will be. This is necessary for 
the
> > consumer to be able to commit offsets for its assigned partitions. 
Once
> the
> > consumer has a new assignment, it is no longer safe to commit offsets
> from
> > the previous generation. Unless sticky assignment can give us some
> > guarantee on which partitions will remain after the rebalance, all of
> them
> > must be included in the revocation callback.
> >
> >
> > > There is one last scenario I'd like to highlight that I think the 
KIP
> > > should describe: say you have a group consuming from two topics, 
each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers 
you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> >
> >
> > This would be true for range assignment, but with 4 partitions total,
> > round-robin assignment would give one partition to each of the 4
> consumers
> > (assuming subscriptions match).
> >
> > Thanks,
> > Jason
> >
> >
> > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <
> big.andy.coates@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I think sticky assignment is immensely important / useful in many
> > > situations. Apps that use Kafka are many and varied. Any app that
> stores
> > > any state, either in the form of data from incoming messages, cached
> > > results from previous out-of-process calls or expensive operations,
> (and
> > > let's face it, that's most!), can see a big negative impact from
> > partition
> > > movement.
> > >
> > > The main issue partition movement brings is that it makes building
> > elastic
> > > services very hard. Consider: you've got an app consuming from Kafka
> that
> > > locally caches data to improve performance. You want the app to auto
> > scale
> > > as the throughout to the topic(s) increases. Currently,   when one 
or
> > more
> > > new instance are added and the group rebalances, all existing 
instances
> > > have all partitions revoked, and then a new, potentially quite
> different,
> > > set assigned. An intuitive pattern is to evict partition state, I.e.
> the
> > > cached data, when a partition is revoked. So in this case all apps
> flush
> > > their entire cache causing throughput to drop massively, right when 
you
> > > want to increase it!
> > >
> > > Even if the app is not flushing partition state when partitions are
> > > revoked, the lack of a 'sticky' strategy means that a proportion of 
the
> > > cached state is now useless, and instances have partitions assigned 
for
> > > which they have no cached state, again negatively impacting 
throughout.
> > >
> > > With a 'sticky' strategy throughput can be maintained and indeed
> > increased,
> > > as intended.
> > >
> > > The same is also true in the presence of failure. An instance 
failing,
> > > (maybe due to high load), can invalidate the caching of existing
> > instances,
> > > negatively impacting throughout of the remaining instances, 
(possibly
> at
> > a
> > > time the system needs throughput the most!)
> > >
> > > My question would be 'why move partitions if you don't have to?'. I
> will
> > > certainly be setting the 'sticky' assignment strategy as the default
> once
> > > it's released, and I have a feeling it will become the default in 
the
> > > communitie's 'best-practice' guides.
> > >
> > > In addition, I think it is important that during a rebalance 
consumers
> do
> > > not first have all partitions revoked, only to have a very similar, 
(or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions 
assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> > >
> > > I think the change to how the client listener is called should be 
part
> of
> > > this work.
> > >
> > > There is one last scenario I'd like to highlight that I think the 
KIP
> > > should describe: say you have a group consuming from two topics, 
each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers 
you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> > >
> > > Thanks,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io>
> wrote:
> > >
> > > > Hey Vahid,
> > > >
> > > > Thanks for the updates. I think the lack of comments on this KIP
> > suggests
> > > > that the motivation might need a little work. Here are the two 
main
> > > > benefits of this assignor as I see them:
> > > >
> > > > 1. It can give a more balanced assignment when subscriptions do 
not
> > match
> > > > in a group (this is the same problem solved by KIP-49).
> > > > 2. It potentially allows applications to save the need to cleanup
> > > partition
> > > > state when rebalancing since partitions are more likely to stay
> > assigned
> > > to
> > > > the same consumer.
> > > >
> > > > Does that seem right to you?
> > > >
> > > > I think it's unclear how serious the first problem is. Providing
> better
> > > > balance when subscriptions differ is nice, but are rolling updates
> the
> > > only
> > > > scenario where this is encountered? Or are there more general use
> cases
> > > > where differing subscriptions could persist for a longer duration?
> I'm
> > > also
> > > > wondering if this assignor addresses the problem found in 
KAFKA-2019.
> > It
> > > > would be useful to confirm whether this problem still exists with 
the
> > new
> > > > consumer's round robin strategy and how (whether?) it is addressed 
by
> > > this
> > > > assignor.
> > > >
> > > > The major selling point seems to be the second point. This is
> > definitely
> > > > nice to have, but would you expect a lot of value in practice 
since
> > > > consumer groups are usually assumed to be stable? It might help to
> > > describe
> > > > some specific use cases to help motivate the proposal. One of the
> > > downsides
> > > > is that it requires users to restructure their code to get any
> benefit
> > > from
> > > > it. In particular, they need to move partition cleanup out of the
> > > > onPartitionsRevoked() callback and into onPartitionsAssigned(). 
This
> > is a
> > > > little awkward and will probably make explaining the consumer more
> > > > difficult. It's probably worth including a discussion of this 
point
> in
> > > the
> > > > proposal with an example.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com
> > > > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > I updated the KIP and added some details about the user data, 
the
> > > > > assignment algorithm, and the alternative strategies to 
consider.
> > > > >
> > > > >
> > > >
> > >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> > > > >
> > > > > Please let me know if I missed to add something. Thank you.
> > > > >
> > > > > Regards,
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Guozhang Wang <wa...@gmail.com>.
Just adding some related reference here:

Henry Cai is contributing some advanced feature in Kafka Streams regarding
static assignment: https://github.com/apache/kafka/pull/1543

The main motivation is that when you do rolling bounce for upgrading your
Kafka Streams code, for example, you would prefer to not move assigned
partitions of the current bouncing instance to others, and today it is
worked around by increasing the session.timeout; but what is more tricky is
that when the bouncing instance comes back, it will still trigger a
rebalance. The idea is that as long as we can encode the previous
iteration's assignment map, and we can check that the list of partitions /
members does not change regarding to their previous assigned partitions, we
keep the assigned as is.

Guozhang


On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates <bi...@gmail.com>
wrote:

> Hey Jason,
>
> Good to know on the round robin assignment. I'll look into that.
>
> The issue I have with the current rebalance listener is that it's not
> intuitive and unnecessarily exposes the inner workings of rebalance logic.
> When the onPartitionsRevoked method is called it's not really saying the
> partitions were revoked. It's really saying a rebalance is happening and
> you need to deal with any in-flight partitions & commit offsets. So maybe
> the method name is wrong! Maybe it should be 'onRebalance' or
> 'commitOffsets'..?  Then the interface could also have an
> onPartitionsRevoked method that is only called when partitions have been
> revoked and given to someone else to handle, rather than just kind of
> paused while we rebalance... maybe the new method could be
> onPausePartitions?
>
> Andy
>
> On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Andy,
> >
> > Thanks for jumping in. A couple comments:
> >
> > In addition, I think it is important that during a rebalance consumers do
> > > not first have all partitions revoked, only to have a very similar, (or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> >
> >
> > The problem is that the revocation callback is called before you know
> what
> > the assignment for the next generation will be. This is necessary for the
> > consumer to be able to commit offsets for its assigned partitions. Once
> the
> > consumer has a new assignment, it is no longer safe to commit offsets
> from
> > the previous generation. Unless sticky assignment can give us some
> > guarantee on which partitions will remain after the rebalance, all of
> them
> > must be included in the revocation callback.
> >
> >
> > > There is one last scenario I'd like to highlight that I think the KIP
> > > should describe: say you have a group consuming from two topics, each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> >
> >
> > This would be true for range assignment, but with 4 partitions total,
> > round-robin assignment would give one partition to each of the 4
> consumers
> > (assuming subscriptions match).
> >
> > Thanks,
> > Jason
> >
> >
> > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <
> big.andy.coates@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I think sticky assignment is immensely important / useful in many
> > > situations. Apps that use Kafka are many and varied. Any app that
> stores
> > > any state, either in the form of data from incoming messages, cached
> > > results from previous out-of-process calls or expensive operations,
> (and
> > > let's face it, that's most!), can see a big negative impact from
> > partition
> > > movement.
> > >
> > > The main issue partition movement brings is that it makes building
> > elastic
> > > services very hard. Consider: you've got an app consuming from Kafka
> that
> > > locally caches data to improve performance. You want the app to auto
> > scale
> > > as the throughout to the topic(s) increases. Currently,   when one or
> > more
> > > new instance are added and the group rebalances, all existing instances
> > > have all partitions revoked, and then a new, potentially quite
> different,
> > > set assigned. An intuitive pattern is to evict partition state, I.e.
> the
> > > cached data, when a partition is revoked. So in this case all apps
> flush
> > > their entire cache causing throughput to drop massively, right when you
> > > want to increase it!
> > >
> > > Even if the app is not flushing partition state when partitions are
> > > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > > cached state is now useless, and instances have partitions assigned for
> > > which they have no cached state, again negatively impacting throughout.
> > >
> > > With a 'sticky' strategy throughput can be maintained and indeed
> > increased,
> > > as intended.
> > >
> > > The same is also true in the presence of failure. An instance failing,
> > > (maybe due to high load), can invalidate the caching of existing
> > instances,
> > > negatively impacting throughout of the remaining instances, (possibly
> at
> > a
> > > time the system needs throughput the most!)
> > >
> > > My question would be 'why move partitions if you don't have to?'. I
> will
> > > certainly be setting the 'sticky' assignment strategy as the default
> once
> > > it's released, and I have a feeling it will become the default in the
> > > communitie's 'best-practice' guides.
> > >
> > > In addition, I think it is important that during a rebalance consumers
> do
> > > not first have all partitions revoked, only to have a very similar, (or
> > the
> > > same!), set reassigned. This is less than initiative and complicates
> > client
> > > code unnecessarily. Instead, the `ConsumerPartitionListener` should
> only
> > be
> > > called for true changes in assignment I.e. any new partitions assigned
> > and
> > > any existing ones revoked, when comparing the new assignment to the
> > > previous one.
> > >
> > > I think the change to how the client listener is called should be part
> of
> > > this work.
> > >
> > > There is one last scenario I'd like to highlight that I think the KIP
> > > should describe: say you have a group consuming from two topics, each
> > topic
> > > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> > can
> > > have is 2, not 4. With 2 consumers each will get one partition from
> each
> > > topic. A third consumer with not have any partitions assigned. This
> > should
> > > be fixed by the 'fair' part of the strategy, but it would be good to
> see
> > > this covered explicitly in the KIP.
> > >
> > > Thanks,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io>
> wrote:
> > >
> > > > Hey Vahid,
> > > >
> > > > Thanks for the updates. I think the lack of comments on this KIP
> > suggests
> > > > that the motivation might need a little work. Here are the two main
> > > > benefits of this assignor as I see them:
> > > >
> > > > 1. It can give a more balanced assignment when subscriptions do not
> > match
> > > > in a group (this is the same problem solved by KIP-49).
> > > > 2. It potentially allows applications to save the need to cleanup
> > > partition
> > > > state when rebalancing since partitions are more likely to stay
> > assigned
> > > to
> > > > the same consumer.
> > > >
> > > > Does that seem right to you?
> > > >
> > > > I think it's unclear how serious the first problem is. Providing
> better
> > > > balance when subscriptions differ is nice, but are rolling updates
> the
> > > only
> > > > scenario where this is encountered? Or are there more general use
> cases
> > > > where differing subscriptions could persist for a longer duration?
> I'm
> > > also
> > > > wondering if this assignor addresses the problem found in KAFKA-2019.
> > It
> > > > would be useful to confirm whether this problem still exists with the
> > new
> > > > consumer's round robin strategy and how (whether?) it is addressed by
> > > this
> > > > assignor.
> > > >
> > > > The major selling point seems to be the second point. This is
> > definitely
> > > > nice to have, but would you expect a lot of value in practice since
> > > > consumer groups are usually assumed to be stable? It might help to
> > > describe
> > > > some specific use cases to help motivate the proposal. One of the
> > > downsides
> > > > is that it requires users to restructure their code to get any
> benefit
> > > from
> > > > it. In particular, they need to move partition cleanup out of the
> > > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> > is a
> > > > little awkward and will probably make explaining the consumer more
> > > > difficult. It's probably worth including a discussion of this point
> in
> > > the
> > > > proposal with an example.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com
> > > > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > I updated the KIP and added some details about the user data, the
> > > > > assignment algorithm, and the alternative strategies to consider.
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > > > >
> > > > > Please let me know if I missed to add something. Thank you.
> > > > >
> > > > > Regards,
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Andrew Coates <bi...@gmail.com>.
Hey Jason,

Good to know on the round robin assignment. I'll look into that.

The issue I have with the current rebalance listener is that it's not
intuitive and unnecessarily exposes the inner workings of rebalance logic.
When the onPartitionsRevoked method is called it's not really saying the
partitions were revoked. It's really saying a rebalance is happening and
you need to deal with any in-flight partitions & commit offsets. So maybe
the method name is wrong! Maybe it should be 'onRebalance' or
'commitOffsets'..?  Then the interface could also have an
onPartitionsRevoked method that is only called when partitions have been
revoked and given to someone else to handle, rather than just kind of
paused while we rebalance... maybe the new method could be
onPausePartitions?

Andy

On Thu, 23 Jun 2016, 18:06 Jason Gustafson, <ja...@confluent.io> wrote:

> Hey Andy,
>
> Thanks for jumping in. A couple comments:
>
> In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
>
>
> The problem is that the revocation callback is called before you know what
> the assignment for the next generation will be. This is necessary for the
> consumer to be able to commit offsets for its assigned partitions. Once the
> consumer has a new assignment, it is no longer safe to commit offsets from
> the previous generation. Unless sticky assignment can give us some
> guarantee on which partitions will remain after the rebalance, all of them
> must be included in the revocation callback.
>
>
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
>
>
> This would be true for range assignment, but with 4 partitions total,
> round-robin assignment would give one partition to each of the 4 consumers
> (assuming subscriptions match).
>
> Thanks,
> Jason
>
>
> On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I think sticky assignment is immensely important / useful in many
> > situations. Apps that use Kafka are many and varied. Any app that stores
> > any state, either in the form of data from incoming messages, cached
> > results from previous out-of-process calls or expensive operations, (and
> > let's face it, that's most!), can see a big negative impact from
> partition
> > movement.
> >
> > The main issue partition movement brings is that it makes building
> elastic
> > services very hard. Consider: you've got an app consuming from Kafka that
> > locally caches data to improve performance. You want the app to auto
> scale
> > as the throughout to the topic(s) increases. Currently,   when one or
> more
> > new instance are added and the group rebalances, all existing instances
> > have all partitions revoked, and then a new, potentially quite different,
> > set assigned. An intuitive pattern is to evict partition state, I.e. the
> > cached data, when a partition is revoked. So in this case all apps flush
> > their entire cache causing throughput to drop massively, right when you
> > want to increase it!
> >
> > Even if the app is not flushing partition state when partitions are
> > revoked, the lack of a 'sticky' strategy means that a proportion of the
> > cached state is now useless, and instances have partitions assigned for
> > which they have no cached state, again negatively impacting throughout.
> >
> > With a 'sticky' strategy throughput can be maintained and indeed
> increased,
> > as intended.
> >
> > The same is also true in the presence of failure. An instance failing,
> > (maybe due to high load), can invalidate the caching of existing
> instances,
> > negatively impacting throughout of the remaining instances, (possibly at
> a
> > time the system needs throughput the most!)
> >
> > My question would be 'why move partitions if you don't have to?'. I will
> > certainly be setting the 'sticky' assignment strategy as the default once
> > it's released, and I have a feeling it will become the default in the
> > communitie's 'best-practice' guides.
> >
> > In addition, I think it is important that during a rebalance consumers do
> > not first have all partitions revoked, only to have a very similar, (or
> the
> > same!), set reassigned. This is less than initiative and complicates
> client
> > code unnecessarily. Instead, the `ConsumerPartitionListener` should only
> be
> > called for true changes in assignment I.e. any new partitions assigned
> and
> > any existing ones revoked, when comparing the new assignment to the
> > previous one.
> >
> > I think the change to how the client listener is called should be part of
> > this work.
> >
> > There is one last scenario I'd like to highlight that I think the KIP
> > should describe: say you have a group consuming from two topics, each
> topic
> > with two partitions. As of 0.9.0.1 the maximum number of consumers you
> can
> > have is 2, not 4. With 2 consumers each will get one partition from each
> > topic. A third consumer with not have any partitions assigned. This
> should
> > be fixed by the 'fair' part of the strategy, but it would be good to see
> > this covered explicitly in the KIP.
> >
> > Thanks,
> >
> >
> > Andy
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
> >
> > > Hey Vahid,
> > >
> > > Thanks for the updates. I think the lack of comments on this KIP
> suggests
> > > that the motivation might need a little work. Here are the two main
> > > benefits of this assignor as I see them:
> > >
> > > 1. It can give a more balanced assignment when subscriptions do not
> match
> > > in a group (this is the same problem solved by KIP-49).
> > > 2. It potentially allows applications to save the need to cleanup
> > partition
> > > state when rebalancing since partitions are more likely to stay
> assigned
> > to
> > > the same consumer.
> > >
> > > Does that seem right to you?
> > >
> > > I think it's unclear how serious the first problem is. Providing better
> > > balance when subscriptions differ is nice, but are rolling updates the
> > only
> > > scenario where this is encountered? Or are there more general use cases
> > > where differing subscriptions could persist for a longer duration? I'm
> > also
> > > wondering if this assignor addresses the problem found in KAFKA-2019.
> It
> > > would be useful to confirm whether this problem still exists with the
> new
> > > consumer's round robin strategy and how (whether?) it is addressed by
> > this
> > > assignor.
> > >
> > > The major selling point seems to be the second point. This is
> definitely
> > > nice to have, but would you expect a lot of value in practice since
> > > consumer groups are usually assumed to be stable? It might help to
> > describe
> > > some specific use cases to help motivate the proposal. One of the
> > downsides
> > > is that it requires users to restructure their code to get any benefit
> > from
> > > it. In particular, they need to move partition cleanup out of the
> > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This
> is a
> > > little awkward and will probably make explaining the consumer more
> > > difficult. It's probably worth including a discussion of this point in
> > the
> > > proposal with an example.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com
> > > > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > I updated the KIP and added some details about the user data, the
> > > > assignment algorithm, and the alternative strategies to consider.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > > >
> > > > Please let me know if I missed to add something. Thank you.
> > > >
> > > > Regards,
> > > > --Vahid
> > > >
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Andy,

Thanks for jumping in. A couple comments:

In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or the
> same!), set reassigned. This is less than initiative and complicates client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
> called for true changes in assignment I.e. any new partitions assigned and
> any existing ones revoked, when comparing the new assignment to the
> previous one.


The problem is that the revocation callback is called before you know what
the assignment for the next generation will be. This is necessary for the
consumer to be able to commit offsets for its assigned partitions. Once the
consumer has a new assignment, it is no longer safe to commit offsets from
the previous generation. Unless sticky assignment can give us some
guarantee on which partitions will remain after the rebalance, all of them
must be included in the revocation callback.


> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.


This would be true for range assignment, but with 4 partitions total,
round-robin assignment would give one partition to each of the 4 consumers
(assuming subscriptions match).

Thanks,
Jason


On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <bi...@gmail.com>
wrote:

> Hi all,
>
> I think sticky assignment is immensely important / useful in many
> situations. Apps that use Kafka are many and varied. Any app that stores
> any state, either in the form of data from incoming messages, cached
> results from previous out-of-process calls or expensive operations, (and
> let's face it, that's most!), can see a big negative impact from partition
> movement.
>
> The main issue partition movement brings is that it makes building elastic
> services very hard. Consider: you've got an app consuming from Kafka that
> locally caches data to improve performance. You want the app to auto scale
> as the throughout to the topic(s) increases. Currently,   when one or more
> new instance are added and the group rebalances, all existing instances
> have all partitions revoked, and then a new, potentially quite different,
> set assigned. An intuitive pattern is to evict partition state, I.e. the
> cached data, when a partition is revoked. So in this case all apps flush
> their entire cache causing throughput to drop massively, right when you
> want to increase it!
>
> Even if the app is not flushing partition state when partitions are
> revoked, the lack of a 'sticky' strategy means that a proportion of the
> cached state is now useless, and instances have partitions assigned for
> which they have no cached state, again negatively impacting throughout.
>
> With a 'sticky' strategy throughput can be maintained and indeed increased,
> as intended.
>
> The same is also true in the presence of failure. An instance failing,
> (maybe due to high load), can invalidate the caching of existing instances,
> negatively impacting throughout of the remaining instances, (possibly at a
> time the system needs throughput the most!)
>
> My question would be 'why move partitions if you don't have to?'. I will
> certainly be setting the 'sticky' assignment strategy as the default once
> it's released, and I have a feeling it will become the default in the
> communitie's 'best-practice' guides.
>
> In addition, I think it is important that during a rebalance consumers do
> not first have all partitions revoked, only to have a very similar, (or the
> same!), set reassigned. This is less than initiative and complicates client
> code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
> called for true changes in assignment I.e. any new partitions assigned and
> any existing ones revoked, when comparing the new assignment to the
> previous one.
>
> I think the change to how the client listener is called should be part of
> this work.
>
> There is one last scenario I'd like to highlight that I think the KIP
> should describe: say you have a group consuming from two topics, each topic
> with two partitions. As of 0.9.0.1 the maximum number of consumers you can
> have is 2, not 4. With 2 consumers each will get one partition from each
> topic. A third consumer with not have any partitions assigned. This should
> be fixed by the 'fair' part of the strategy, but it would be good to see
> this covered explicitly in the KIP.
>
> Thanks,
>
>
> Andy
>
>
>
>
>
>
>
>
> On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hey Vahid,
> >
> > Thanks for the updates. I think the lack of comments on this KIP suggests
> > that the motivation might need a little work. Here are the two main
> > benefits of this assignor as I see them:
> >
> > 1. It can give a more balanced assignment when subscriptions do not match
> > in a group (this is the same problem solved by KIP-49).
> > 2. It potentially allows applications to save the need to cleanup
> partition
> > state when rebalancing since partitions are more likely to stay assigned
> to
> > the same consumer.
> >
> > Does that seem right to you?
> >
> > I think it's unclear how serious the first problem is. Providing better
> > balance when subscriptions differ is nice, but are rolling updates the
> only
> > scenario where this is encountered? Or are there more general use cases
> > where differing subscriptions could persist for a longer duration? I'm
> also
> > wondering if this assignor addresses the problem found in KAFKA-2019. It
> > would be useful to confirm whether this problem still exists with the new
> > consumer's round robin strategy and how (whether?) it is addressed by
> this
> > assignor.
> >
> > The major selling point seems to be the second point. This is definitely
> > nice to have, but would you expect a lot of value in practice since
> > consumer groups are usually assumed to be stable? It might help to
> describe
> > some specific use cases to help motivate the proposal. One of the
> downsides
> > is that it requires users to restructure their code to get any benefit
> from
> > it. In particular, they need to move partition cleanup out of the
> > onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> > little awkward and will probably make explaining the consumer more
> > difficult. It's probably worth including a discussion of this point in
> the
> > proposal with an example.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com
> > > wrote:
> >
> > > Hi Jason,
> > >
> > > I updated the KIP and added some details about the user data, the
> > > assignment algorithm, and the alternative strategies to consider.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> > >
> > > Please let me know if I missed to add something. Thank you.
> > >
> > > Regards,
> > > --Vahid
> > >
> > >
> > >
> >
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Andrew Coates <bi...@gmail.com>.
Hi all,

I think sticky assignment is immensely important / useful in many
situations. Apps that use Kafka are many and varied. Any app that stores
any state, either in the form of data from incoming messages, cached
results from previous out-of-process calls or expensive operations, (and
let's face it, that's most!), can see a big negative impact from partition
movement.

The main issue partition movement brings is that it makes building elastic
services very hard. Consider: you've got an app consuming from Kafka that
locally caches data to improve performance. You want the app to auto scale
as the throughout to the topic(s) increases. Currently,   when one or more
new instance are added and the group rebalances, all existing instances
have all partitions revoked, and then a new, potentially quite different,
set assigned. An intuitive pattern is to evict partition state, I.e. the
cached data, when a partition is revoked. So in this case all apps flush
their entire cache causing throughput to drop massively, right when you
want to increase it!

Even if the app is not flushing partition state when partitions are
revoked, the lack of a 'sticky' strategy means that a proportion of the
cached state is now useless, and instances have partitions assigned for
which they have no cached state, again negatively impacting throughout.

With a 'sticky' strategy throughput can be maintained and indeed increased,
as intended.

The same is also true in the presence of failure. An instance failing,
(maybe due to high load), can invalidate the caching of existing instances,
negatively impacting throughout of the remaining instances, (possibly at a
time the system needs throughput the most!)

My question would be 'why move partitions if you don't have to?'. I will
certainly be setting the 'sticky' assignment strategy as the default once
it's released, and I have a feeling it will become the default in the
communitie's 'best-practice' guides.

In addition, I think it is important that during a rebalance consumers do
not first have all partitions revoked, only to have a very similar, (or the
same!), set reassigned. This is less than initiative and complicates client
code unnecessarily. Instead, the `ConsumerPartitionListener` should only be
called for true changes in assignment I.e. any new partitions assigned and
any existing ones revoked, when comparing the new assignment to the
previous one.

I think the change to how the client listener is called should be part of
this work.

There is one last scenario I'd like to highlight that I think the KIP
should describe: say you have a group consuming from two topics, each topic
with two partitions. As of 0.9.0.1 the maximum number of consumers you can
have is 2, not 4. With 2 consumers each will get one partition from each
topic. A third consumer with not have any partitions assigned. This should
be fixed by the 'fair' part of the strategy, but it would be good to see
this covered explicitly in the KIP.

Thanks,


Andy








On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote:

> Hey Vahid,
>
> Thanks for the updates. I think the lack of comments on this KIP suggests
> that the motivation might need a little work. Here are the two main
> benefits of this assignor as I see them:
>
> 1. It can give a more balanced assignment when subscriptions do not match
> in a group (this is the same problem solved by KIP-49).
> 2. It potentially allows applications to save the need to cleanup partition
> state when rebalancing since partitions are more likely to stay assigned to
> the same consumer.
>
> Does that seem right to you?
>
> I think it's unclear how serious the first problem is. Providing better
> balance when subscriptions differ is nice, but are rolling updates the only
> scenario where this is encountered? Or are there more general use cases
> where differing subscriptions could persist for a longer duration? I'm also
> wondering if this assignor addresses the problem found in KAFKA-2019. It
> would be useful to confirm whether this problem still exists with the new
> consumer's round robin strategy and how (whether?) it is addressed by this
> assignor.
>
> The major selling point seems to be the second point. This is definitely
> nice to have, but would you expect a lot of value in practice since
> consumer groups are usually assumed to be stable? It might help to describe
> some specific use cases to help motivate the proposal. One of the downsides
> is that it requires users to restructure their code to get any benefit from
> it. In particular, they need to move partition cleanup out of the
> onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
> little awkward and will probably make explaining the consumer more
> difficult. It's probably worth including a discussion of this point in the
> proposal with an example.
>
> Thanks,
> Jason
>
>
>
> On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I updated the KIP and added some details about the user data, the
> > assignment algorithm, and the alternative strategies to consider.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > Please let me know if I missed to add something. Thank you.
> >
> > Regards,
> > --Vahid
> >
> >
> >
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

I appreciate your feedback.
Please see my comments below, and advise if you have further suggestions. 
Thanks.
 
Regards,
--Vahid
 



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   06/22/2016 04:41 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hey Vahid,

Thanks for the updates. I think the lack of comments on this KIP suggests
that the motivation might need a little work. Here are the two main
benefits of this assignor as I see them:

1. It can give a more balanced assignment when subscriptions do not match
in a group (this is the same problem solved by KIP-49).
2. It potentially allows applications to save the need to cleanup 
partition
state when rebalancing since partitions are more likely to stay assigned 
to
the same consumer.

Does that seem right to you?


Yes, it does. Your summarized it nicely. #1 is an advantage of this 
strategy compared to existing round robin and fair strategies.


I think it's unclear how serious the first problem is. Providing better
balance when subscriptions differ is nice, but are rolling updates the 
only
scenario where this is encountered? Or are there more general use cases
where differing subscriptions could persist for a longer duration? I'm 
also
wondering if this assignor addresses the problem found in KAFKA-2019. It
would be useful to confirm whether this problem still exists with the new
consumer's round robin strategy and how (whether?) it is addressed by this
assignor.

I'm not very clear on the first part of this paragraph. You could clarify 
it for me, but in general balancing out the partitions across consumers in 
a group as much as possible would normally mean balancing the load within 
the cluster, and that's something a user would want to have compared to 
cases where the assignments and therefore the load could be quite 
unbalanced depending on the subscriptions. Having an optimal balance is 
definitely more reassuring that knowing partition assignments could get 
quite unbalanced. There is an example in the KIP that explains a simple 
use case that leads to an unbalanced assignment with round robin 
assignment. This unbalance could become much more severe in real use cases 
with many more topics / partitions / consumers, and that's ideally 
something we would want to avoid, if possible.

Regarding KAFKA-2019, when I try the simple use case of 
https://issues.apache.org/jira/browse/KAFKA-2019?focusedCommentId=14360892&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14360892 
each of my consumers gets 3 partitions, which is not the same as what is 
mentioned in the comment. I might be missing something in the 
configuration (except setting the strategy to 'roundrobin', and fetcher 
threads to '2') or the issue may have been resolved already by some other 
patch. In any case, the issue based on what I read in the JIRA stems from 
multiple threads that each consumer may have and how they threads of each 
consumer are assigned first before assigning partitions to other consumer 
threads.

Since the new consumer is single threaded there is no such problem in its 
round robin strategy. It simply considers consumers one by one for each 
partition assignment, and when one consumer is assigned a partition, the 
next assignment starts with considering the next consumer in the list (and 
not the same consumer that was just assigned). This removes the 
possibility of the issue reported in KAFKA-2019 surfacing in the new 
consumer. In the sticky strategy we do not have this issue either, since 
every time an assignment is about to happen we start with the consumer 
with least number of assignments. So we will not have a scenario where a 
consumer is repeated assigned partitions as in KAFKA-2019 (unless that 
consumer is lagging behind other consumers on the number of partitions 
assigned).


The major selling point seems to be the second point. This is definitely
nice to have, but would you expect a lot of value in practice since
consumer groups are usually assumed to be stable? It might help to 
describe
some specific use cases to help motivate the proposal. One of the 
downsides
is that it requires users to restructure their code to get any benefit 
from
it. In particular, they need to move partition cleanup out of the
onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
little awkward and will probably make explaining the consumer more
difficult. It's probably worth including a discussion of this point in the
proposal with an example.

Even though consumer groups are usually stable, it might be the case that 
consumers do not initially join the group at the same time. The sticky 
strategy in that situation lets those who joined earlier stick to their 
partitions to some extent (assuming fairness take precedence over 
stickiness). In terms of specific use cases, Andrew touched on examples of 
how Kafka can benefit from a sticky assignor. I could add those to the KIP 
if you also think they help building the case in favor of sticky assignor. 
I agree with you about the downside and I'll make sure I add that to the 
KIP as you suggested.

Thanks,
Jason



On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian 
<vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> I updated the KIP and added some details about the user data, the
> assignment algorithm, and the alternative strategies to consider.
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

>
> Please let me know if I missed to add something. Thank you.
>
> Regards,
> --Vahid
>
>
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

Thanks for the updates. I think the lack of comments on this KIP suggests
that the motivation might need a little work. Here are the two main
benefits of this assignor as I see them:

1. It can give a more balanced assignment when subscriptions do not match
in a group (this is the same problem solved by KIP-49).
2. It potentially allows applications to save the need to cleanup partition
state when rebalancing since partitions are more likely to stay assigned to
the same consumer.

Does that seem right to you?

I think it's unclear how serious the first problem is. Providing better
balance when subscriptions differ is nice, but are rolling updates the only
scenario where this is encountered? Or are there more general use cases
where differing subscriptions could persist for a longer duration? I'm also
wondering if this assignor addresses the problem found in KAFKA-2019. It
would be useful to confirm whether this problem still exists with the new
consumer's round robin strategy and how (whether?) it is addressed by this
assignor.

The major selling point seems to be the second point. This is definitely
nice to have, but would you expect a lot of value in practice since
consumer groups are usually assumed to be stable? It might help to describe
some specific use cases to help motivate the proposal. One of the downsides
is that it requires users to restructure their code to get any benefit from
it. In particular, they need to move partition cleanup out of the
onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a
little awkward and will probably make explaining the consumer more
difficult. It's probably worth including a discussion of this point in the
proposal with an example.

Thanks,
Jason



On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> I updated the KIP and added some details about the user data, the
> assignment algorithm, and the alternative strategies to consider.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> Please let me know if I missed to add something. Thank you.
>
> Regards,
> --Vahid
>
>
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

I updated the KIP and added some details about the user data, the 
assignment algorithm, and the alternative strategies to consider.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

Please let me know if I missed to add something. Thank you.
 
Regards,
--Vahid



Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,

The only thing I added was the specification of the UserData field. The
rest comes from here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol.
See the section on the JoinGroup request.

Generally speaking, I think having fewer assignment strategies included
with Kafka is probably better. One of the advantages of the client-side
assignment approach is that there's no actual need to bundle them into the
release. Applications can use them by depending on a separate library. That
said, sticky assignment seems like a generally good idea and a common need,
so it may be helpful for a lot of users to make it easily available in the
release. If it also addresses the issues raised in KIP-49, then so much the
better.

As for whether we should include both, there I'm not too sure. Most users
probably wouldn't have a strong reason to choose the "fair" assignment over
the "sticky" assignment since they both seem to have the same properties in
terms of balancing the group's partitions. The overhead is a concern for
large groups with many topic subscriptions though, so if people think that
the "fair" approach brings a lot of benefit over round-robin, then it may
be worth including also.

-Jason

On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> Thanks for reviewing the KIP.
> I will add the details you requested, but to summarize:
>
> Regarding the structure of the user data:
>
> Right now the user data will have the current assignments only which is a
> mapping of consumers to their assigned topic partitions. Is this mapping
> what you're also suggesting with CurrentAssignment field?
> I see how adding a version (as sticky assignor version) will be useful.
> Also how having a protocol name would be useful, perhaps for validation.
> But could you clarify the "Subscription" field and how you think it'll
> come into play?
>
>
> Regarding the algorithm:
>
> There could be similarities between how this KIP is implemented and how
> KIP-49 is handling the fairness. But since we had to take stickiness into
> consideration we started fresh and did not adopt from KIP-49.
> The Sticky assignor implementation is comprehensive and guarantees the
> fairest possible assignment with highest stickiness. I even have a unit
> test that randomly generates an assignment problem and verifies that a
> fair and sticky assignment is calculated.
> KIP-54 gives priority to fairness over stickiness (which makes the
> implementation more complex). We could have another strategy that gives
> priority to stickiness over fairness (which supposedly will have a better
> performance).
> The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates
> the assignment without considering the previous assignments (fairness
> only); whereas for KIP-54 previous assignments play a big role (fairness
> and stickiness).
> I believe if there is a situation where the stickiness requirements do not
> exist it would make sense to use a fair-only assignment without the
> overhead of sticky assignment, as you mentioned.
> So, I could see three different strategies that could enrich assignment
> policy options.
> It would be great to have some feedback from the community about what is
> the best way to move forward with these two KIPs.
>
> In the meantime, I'll add some more details in the KIP about the approach
> for calculating assignments.
>
> Thanks again.
>
> Regards,
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   06/06/2016 01:26 PM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hi Vahid,
>
> Can you add some detail to the KIP on the structure of the user data? I'm
> guessing it would be something like this:
>
> ProtocolName => "sticky"
>
> ProtocolMetadata => Version Subscription UserData
>   Version => int16
>   Subscription => [Topic]
>     Topic => string
>   UserData => CurrentAssignment
>     CurrentAssignment => [Topic [Partition]]
>       Topic => string
>       Partiton => int32
>
> It would also be helpful to include a little more detail on the algorithm.
> From what I can tell, it looks like you're adopting some of the strategies
> from KIP-49 to handle differing subscriptions better. If so, then I wonder
> if it makes sense to combine the two KIPs? Or do you think there would be
> an advantage to having the "fair" assignment strategy without the overhead
> of the sticky assignor?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Sorry for being late on this thread.
> >
> > The assign() function is auto-triggered during the rebalance by one of
> the
> > consumers when it receives all subscription information collected from
> the
> > server-side coordinator.
> >
> > More details can be found here:
> >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> >
> > As for Kafka Streams, they way it did "stickiness" is by 1) let all
> > consumers put their current assigned topic-partitions and server ids
> into
> > the "metadata" field of the JoinGroupRequest, 2) when the selected
> consumer
> > triggers assign() along with all the subscriptions as well as their
> > metadata, it can parse the metadata to learn about the existing
> assignment
> > map; and hence when making the new assignment it will try to assign
> > partitions to its current owners "with best effort".
> >
> >
> > Hope this helps.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I was looking at the implementation of StreamsPartitionAssignor
> through
> > > its unit tests and expected to find some tests that
> > > - verify stickiness by making at least two calls to the assign()
> method
> > > (so we check the second assign() call output preserves the assignments
> > > coming from the first assign() call output); or
> > > - start off by a preset assignment, call assign() after some
> subscription
> > > change, and verify the previous assignment are preserved.
> > > But none of the methods seem to do these. Did I overlook them, or
> > > stickiness is being tested in some other fashion?
> > >
> > > Also, if there is a high-level write-up about how this assignor works
> > > could you please point me to it? Thanks.
> > >
> > > Regards.
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From:   Guozhang Wang <wa...@gmail.com>
> > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > Date:   05/02/2016 10:34 AM
> > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already
> doing
> > > some sort of sticky partitioning mechanism. This is done through the
> > > userData field though; i.e. all group members send their current
> > "assigned
> > > partitions" in their join group request, which will be grouped and
> send
> > to
> > > the leader, the leader then does best-effort for sticky-partitioning.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> > ewen@confluent.io>
> > > wrote:
> > >
> > > > I think I'm unclear how we leverage the
> > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > > different
> > > > from our normal usage -- certainly you can use them to generate a
> diff,
> > > but
> > > > you still need to commit when partitions are revoked and that has a
> > > > non-trivial cost. Are we just saying that you might be able to save
> > some
> > > > overhead, e.g. closing/reopening some other resources by doing a
> flush
> > > but
> > > > not a close() or something? You still need to flush any output and
> > > commit
> > > > offsets before returning from onPartitionsRevoked, right? Otherwise
> you
> > > > couldn't guarantee clean handoff of partitions.
> > > >
> > > > In terms of the rebalancing, the basic requirements in the KIP seem
> > > sound.
> > > > Passing previous assignment data via UserData also seems reasonable
> > > since
> > > > it avoids redistributing all assignment data to all members and
> doesn't
> > > > rely on the next generation leader being a member of the current
> > > > generation. Hopefully this shouldn't be surprising since I think I
> > > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > > >
> > > > -Ewen
> > > >
> > > >
> > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > HI Jason,
> > > > >
> > > > > Thanks for your feedback.
> > > > >
> > > > > I believe your suggestion on how to take advantage of this
> assignor
> > is
> > > > > valid. We can leverage onPartitionsRevoked() and
> > > onPartitionsAssigned()
> > > > > callbacks and do a comparison of assigned partitions before and
> after
> > > the
> > > > > re-balance and do the cleanup only if there is a change (e.g., if
> > some
> > > > > previously assigned partition is not in the assignment).
> > > > >
> > > > > On your second question, a number of tests that I ran shows that
> the
> > > old
> > > > > assignments are preserved in the current implementation; except
> for
> > > when
> > > > > the consumer group leader is killed; in which case, a fresh
> > assignment
> > > is
> > > > > performed. This is something that needs to be fixed. I tried to
> use
> > > your
> > > > > pointers to find out where the best place is to preserve the old
> > > > > assignment in such circumstances but have not been able to
> pinpoint
> > > it.
> > > > If
> > > > > you have any suggestion on this please share. Thanks.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > From:   Jason Gustafson <ja...@confluent.io>
> > > > > To:     dev@kafka.apache.org
> > > > > Date:   04/14/2016 11:37 AM
> > > > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > > Strategy
> > > > >
> > > > >
> > > > >
> > > > > Hi Vahid,
> > > > >
> > > > > Thanks for the proposal. I think one of the advantages of having
> > > sticky
> > > > > assignment would be reduce the need to cleanup local partition
> state
> > > > > between rebalances. Do you have any thoughts on how the user would
> > > take
> > > > > advantage of this assignor in the consumer to do this? Maybe one
> > > approach
> > > > > is to delay cleanup until you detect a change from the previous
> > > > assignment
> > > > > in the onPartitionsAssigned() callback?
> > > > >
> > > > > Also, can you provide some detail on how the sticky assignor works
> at
> > > the
> > > > > group protocol level? For example, do you pass old assignments
> > through
> > > > the
> > > > > "UserData" field in the consumer's JoinGroup?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > > vahidhashemian@us.ibm.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have started a new KIP under
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > >
> > > > >
> > > > > > The corresponding JIRA is at
> > > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > > The corresponding PR is at
> > https://github.com/apache/kafka/pull/1020
> > > > > >
> > > > > > Your feedback is much appreciated.
> > > > > >
> > > > > > Regards,
> > > > > > Vahid Hashemian
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
>
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks for reviewing the KIP.
I will add the details you requested, but to summarize:

Regarding the structure of the user data:

Right now the user data will have the current assignments only which is a 
mapping of consumers to their assigned topic partitions. Is this mapping 
what you're also suggesting with CurrentAssignment field?
I see how adding a version (as sticky assignor version) will be useful. 
Also how having a protocol name would be useful, perhaps for validation.
But could you clarify the "Subscription" field and how you think it'll 
come into play?


Regarding the algorithm:

There could be similarities between how this KIP is implemented and how 
KIP-49 is handling the fairness. But since we had to take stickiness into 
consideration we started fresh and did not adopt from KIP-49.
The Sticky assignor implementation is comprehensive and guarantees the 
fairest possible assignment with highest stickiness. I even have a unit 
test that randomly generates an assignment problem and verifies that a 
fair and sticky assignment is calculated.
KIP-54 gives priority to fairness over stickiness (which makes the 
implementation more complex). We could have another strategy that gives 
priority to stickiness over fairness (which supposedly will have a better 
performance).
The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates 
the assignment without considering the previous assignments (fairness 
only); whereas for KIP-54 previous assignments play a big role (fairness 
and stickiness).
I believe if there is a situation where the stickiness requirements do not 
exist it would make sense to use a fair-only assignment without the 
overhead of sticky assignment, as you mentioned.
So, I could see three different strategies that could enrich assignment 
policy options.
It would be great to have some feedback from the community about what is 
the best way to move forward with these two KIPs.

In the meantime, I'll add some more details in the KIP about the approach 
for calculating assignments.

Thanks again.
 
Regards,
--Vahid
 



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   06/06/2016 01:26 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hi Vahid,

Can you add some detail to the KIP on the structure of the user data? I'm
guessing it would be something like this:

ProtocolName => "sticky"

ProtocolMetadata => Version Subscription UserData
  Version => int16
  Subscription => [Topic]
    Topic => string
  UserData => CurrentAssignment
    CurrentAssignment => [Topic [Partition]]
      Topic => string
      Partiton => int32

It would also be helpful to include a little more detail on the algorithm.
From what I can tell, it looks like you're adopting some of the strategies
from KIP-49 to handle differing subscriptions better. If so, then I wonder
if it makes sense to combine the two KIPs? Or do you think there would be
an advantage to having the "fair" assignment strategy without the overhead
of the sticky assignor?

Thanks,
Jason



On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Sorry for being late on this thread.
>
> The assign() function is auto-triggered during the rebalance by one of 
the
> consumers when it receives all subscription information collected from 
the
> server-side coordinator.
>
> More details can be found here:
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol

>
> As for Kafka Streams, they way it did "stickiness" is by 1) let all
> consumers put their current assigned topic-partitions and server ids 
into
> the "metadata" field of the JoinGroupRequest, 2) when the selected 
consumer
> triggers assign() along with all the subscriptions as well as their
> metadata, it can parse the metadata to learn about the existing 
assignment
> map; and hence when making the new assignment it will try to assign
> partitions to its current owners "with best effort".
>
>
> Hope this helps.
>
>
> Guozhang
>
>
> On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi Guozhang,
> >
> > I was looking at the implementation of StreamsPartitionAssignor 
through
> > its unit tests and expected to find some tests that
> > - verify stickiness by making at least two calls to the assign() 
method
> > (so we check the second assign() call output preserves the assignments
> > coming from the first assign() call output); or
> > - start off by a preset assignment, call assign() after some 
subscription
> > change, and verify the previous assignment are preserved.
> > But none of the methods seem to do these. Did I overlook them, or
> > stickiness is being tested in some other fashion?
> >
> > Also, if there is a high-level write-up about how this assignor works
> > could you please point me to it? Thanks.
> >
> > Regards.
> > --Vahid
> >
> >
> >
> >
> > From:   Guozhang Wang <wa...@gmail.com>
> > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > Date:   05/02/2016 10:34 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment 
Strategy
> >
> >
> >
> > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already 
doing
> > some sort of sticky partitioning mechanism. This is done through the
> > userData field though; i.e. all group members send their current
> "assigned
> > partitions" in their join group request, which will be grouped and 
send
> to
> > the leader, the leader then does best-effort for sticky-partitioning.
> >
> >
> > Guozhang
> >
> > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> > > I think I'm unclear how we leverage the
> > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > different
> > > from our normal usage -- certainly you can use them to generate a 
diff,
> > but
> > > you still need to commit when partitions are revoked and that has a
> > > non-trivial cost. Are we just saying that you might be able to save
> some
> > > overhead, e.g. closing/reopening some other resources by doing a 
flush
> > but
> > > not a close() or something? You still need to flush any output and
> > commit
> > > offsets before returning from onPartitionsRevoked, right? Otherwise 
you
> > > couldn't guarantee clean handoff of partitions.
> > >
> > > In terms of the rebalancing, the basic requirements in the KIP seem
> > sound.
> > > Passing previous assignment data via UserData also seems reasonable
> > since
> > > it avoids redistributing all assignment data to all members and 
doesn't
> > > rely on the next generation leader being a member of the current
> > > generation. Hopefully this shouldn't be surprising since I think I
> > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > >
> > > -Ewen
> > >
> > >
> > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > HI Jason,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I believe your suggestion on how to take advantage of this 
assignor
> is
> > > > valid. We can leverage onPartitionsRevoked() and
> > onPartitionsAssigned()
> > > > callbacks and do a comparison of assigned partitions before and 
after
> > the
> > > > re-balance and do the cleanup only if there is a change (e.g., if
> some
> > > > previously assigned partition is not in the assignment).
> > > >
> > > > On your second question, a number of tests that I ran shows that 
the
> > old
> > > > assignments are preserved in the current implementation; except 
for
> > when
> > > > the consumer group leader is killed; in which case, a fresh
> assignment
> > is
> > > > performed. This is something that needs to be fixed. I tried to 
use
> > your
> > > > pointers to find out where the best place is to preserve the old
> > > > assignment in such circumstances but have not been able to 
pinpoint
> > it.
> > > If
> > > > you have any suggestion on this please share. Thanks.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > > >
> > > >
> > > > From:   Jason Gustafson <ja...@confluent.io>
> > > > To:     dev@kafka.apache.org
> > > > Date:   04/14/2016 11:37 AM
> > > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > Strategy
> > > >
> > > >
> > > >
> > > > Hi Vahid,
> > > >
> > > > Thanks for the proposal. I think one of the advantages of having
> > sticky
> > > > assignment would be reduce the need to cleanup local partition 
state
> > > > between rebalances. Do you have any thoughts on how the user would
> > take
> > > > advantage of this assignor in the consumer to do this? Maybe one
> > approach
> > > > is to delay cleanup until you detect a change from the previous
> > > assignment
> > > > in the onPartitionsAssigned() callback?
> > > >
> > > > Also, can you provide some detail on how the sticky assignor works 
at
> > the
> > > > group protocol level? For example, do you pass old assignments
> through
> > > the
> > > > "UserData" field in the consumer's JoinGroup?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have started a new KIP under
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> >
> > > >
> > > > > The corresponding JIRA is at
> > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > The corresponding PR is at
> https://github.com/apache/kafka/pull/1020
> > > > >
> > > > > Your feedback is much appreciated.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,

Can you add some detail to the KIP on the structure of the user data? I'm
guessing it would be something like this:

ProtocolName => "sticky"

ProtocolMetadata => Version Subscription UserData
  Version => int16
  Subscription => [Topic]
    Topic => string
  UserData => CurrentAssignment
    CurrentAssignment => [Topic [Partition]]
      Topic => string
      Partiton => int32

It would also be helpful to include a little more detail on the algorithm.
From what I can tell, it looks like you're adopting some of the strategies
from KIP-49 to handle differing subscriptions better. If so, then I wonder
if it makes sense to combine the two KIPs? Or do you think there would be
an advantage to having the "fair" assignment strategy without the overhead
of the sticky assignor?

Thanks,
Jason



On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Sorry for being late on this thread.
>
> The assign() function is auto-triggered during the rebalance by one of the
> consumers when it receives all subscription information collected from the
> server-side coordinator.
>
> More details can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> As for Kafka Streams, they way it did "stickiness" is by 1) let all
> consumers put their current assigned topic-partitions and server ids into
> the "metadata" field of the JoinGroupRequest, 2) when the selected consumer
> triggers assign() along with all the subscriptions as well as their
> metadata, it can parse the metadata to learn about the existing assignment
> map; and hence when making the new assignment it will try to assign
> partitions to its current owners "with best effort".
>
>
> Hope this helps.
>
>
> Guozhang
>
>
> On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi Guozhang,
> >
> > I was looking at the implementation of StreamsPartitionAssignor through
> > its unit tests and expected to find some tests that
> > - verify stickiness by making at least two calls to the assign() method
> > (so we check the second assign() call output preserves the assignments
> > coming from the first assign() call output); or
> > - start off by a preset assignment, call assign() after some subscription
> > change, and verify the previous assignment are preserved.
> > But none of the methods seem to do these. Did I overlook them, or
> > stickiness is being tested in some other fashion?
> >
> > Also, if there is a high-level write-up about how this assignor works
> > could you please point me to it? Thanks.
> >
> > Regards.
> > --Vahid
> >
> >
> >
> >
> > From:   Guozhang Wang <wa...@gmail.com>
> > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > Date:   05/02/2016 10:34 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
> >
> >
> >
> > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
> > some sort of sticky partitioning mechanism. This is done through the
> > userData field though; i.e. all group members send their current
> "assigned
> > partitions" in their join group request, which will be grouped and send
> to
> > the leader, the leader then does best-effort for sticky-partitioning.
> >
> >
> > Guozhang
> >
> > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> > > I think I'm unclear how we leverage the
> > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > different
> > > from our normal usage -- certainly you can use them to generate a diff,
> > but
> > > you still need to commit when partitions are revoked and that has a
> > > non-trivial cost. Are we just saying that you might be able to save
> some
> > > overhead, e.g. closing/reopening some other resources by doing a flush
> > but
> > > not a close() or something? You still need to flush any output and
> > commit
> > > offsets before returning from onPartitionsRevoked, right? Otherwise you
> > > couldn't guarantee clean handoff of partitions.
> > >
> > > In terms of the rebalancing, the basic requirements in the KIP seem
> > sound.
> > > Passing previous assignment data via UserData also seems reasonable
> > since
> > > it avoids redistributing all assignment data to all members and doesn't
> > > rely on the next generation leader being a member of the current
> > > generation. Hopefully this shouldn't be surprising since I think I
> > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > >
> > > -Ewen
> > >
> > >
> > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > HI Jason,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I believe your suggestion on how to take advantage of this assignor
> is
> > > > valid. We can leverage onPartitionsRevoked() and
> > onPartitionsAssigned()
> > > > callbacks and do a comparison of assigned partitions before and after
> > the
> > > > re-balance and do the cleanup only if there is a change (e.g., if
> some
> > > > previously assigned partition is not in the assignment).
> > > >
> > > > On your second question, a number of tests that I ran shows that the
> > old
> > > > assignments are preserved in the current implementation; except for
> > when
> > > > the consumer group leader is killed; in which case, a fresh
> assignment
> > is
> > > > performed. This is something that needs to be fixed. I tried to use
> > your
> > > > pointers to find out where the best place is to preserve the old
> > > > assignment in such circumstances but have not been able to pinpoint
> > it.
> > > If
> > > > you have any suggestion on this please share. Thanks.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > > >
> > > >
> > > > From:   Jason Gustafson <ja...@confluent.io>
> > > > To:     dev@kafka.apache.org
> > > > Date:   04/14/2016 11:37 AM
> > > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > Strategy
> > > >
> > > >
> > > >
> > > > Hi Vahid,
> > > >
> > > > Thanks for the proposal. I think one of the advantages of having
> > sticky
> > > > assignment would be reduce the need to cleanup local partition state
> > > > between rebalances. Do you have any thoughts on how the user would
> > take
> > > > advantage of this assignor in the consumer to do this? Maybe one
> > approach
> > > > is to delay cleanup until you detect a change from the previous
> > > assignment
> > > > in the onPartitionsAssigned() callback?
> > > >
> > > > Also, can you provide some detail on how the sticky assignor works at
> > the
> > > > group protocol level? For example, do you pass old assignments
> through
> > > the
> > > > "UserData" field in the consumer's JoinGroup?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have started a new KIP under
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > >
> > > > > The corresponding JIRA is at
> > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > The corresponding PR is at
> https://github.com/apache/kafka/pull/1020
> > > > >
> > > > > Your feedback is much appreciated.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Guozhang Wang <wa...@gmail.com>.
Sorry for being late on this thread.

The assign() function is auto-triggered during the rebalance by one of the
consumers when it receives all subscription information collected from the
server-side coordinator.

More details can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol

As for Kafka Streams, they way it did "stickiness" is by 1) let all
consumers put their current assigned topic-partitions and server ids into
the "metadata" field of the JoinGroupRequest, 2) when the selected consumer
triggers assign() along with all the subscriptions as well as their
metadata, it can parse the metadata to learn about the existing assignment
map; and hence when making the new assignment it will try to assign
partitions to its current owners "with best effort".


Hope this helps.


Guozhang


On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi Guozhang,
>
> I was looking at the implementation of StreamsPartitionAssignor through
> its unit tests and expected to find some tests that
> - verify stickiness by making at least two calls to the assign() method
> (so we check the second assign() call output preserves the assignments
> coming from the first assign() call output); or
> - start off by a preset assignment, call assign() after some subscription
> change, and verify the previous assignment are preserved.
> But none of the methods seem to do these. Did I overlook them, or
> stickiness is being tested in some other fashion?
>
> Also, if there is a high-level write-up about how this assignor works
> could you please point me to it? Thanks.
>
> Regards.
> --Vahid
>
>
>
>
> From:   Guozhang Wang <wa...@gmail.com>
> To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> Date:   05/02/2016 10:34 AM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
> some sort of sticky partitioning mechanism. This is done through the
> userData field though; i.e. all group members send their current "assigned
> partitions" in their join group request, which will be grouped and send to
> the leader, the leader then does best-effort for sticky-partitioning.
>
>
> Guozhang
>
> On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > I think I'm unclear how we leverage the
> > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> different
> > from our normal usage -- certainly you can use them to generate a diff,
> but
> > you still need to commit when partitions are revoked and that has a
> > non-trivial cost. Are we just saying that you might be able to save some
> > overhead, e.g. closing/reopening some other resources by doing a flush
> but
> > not a close() or something? You still need to flush any output and
> commit
> > offsets before returning from onPartitionsRevoked, right? Otherwise you
> > couldn't guarantee clean handoff of partitions.
> >
> > In terms of the rebalancing, the basic requirements in the KIP seem
> sound.
> > Passing previous assignment data via UserData also seems reasonable
> since
> > it avoids redistributing all assignment data to all members and doesn't
> > rely on the next generation leader being a member of the current
> > generation. Hopefully this shouldn't be surprising since I think I
> > discussed this w/ Jason before he updated the relevant wiki pages :)
> >
> > -Ewen
> >
> >
> > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > HI Jason,
> > >
> > > Thanks for your feedback.
> > >
> > > I believe your suggestion on how to take advantage of this assignor is
> > > valid. We can leverage onPartitionsRevoked() and
> onPartitionsAssigned()
> > > callbacks and do a comparison of assigned partitions before and after
> the
> > > re-balance and do the cleanup only if there is a change (e.g., if some
> > > previously assigned partition is not in the assignment).
> > >
> > > On your second question, a number of tests that I ran shows that the
> old
> > > assignments are preserved in the current implementation; except for
> when
> > > the consumer group leader is killed; in which case, a fresh assignment
> is
> > > performed. This is something that needs to be fixed. I tried to use
> your
> > > pointers to find out where the best place is to preserve the old
> > > assignment in such circumstances but have not been able to pinpoint
> it.
> > If
> > > you have any suggestion on this please share. Thanks.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> > >
> > >
> > > From:   Jason Gustafson <ja...@confluent.io>
> > > To:     dev@kafka.apache.org
> > > Date:   04/14/2016 11:37 AM
> > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> Strategy
> > >
> > >
> > >
> > > Hi Vahid,
> > >
> > > Thanks for the proposal. I think one of the advantages of having
> sticky
> > > assignment would be reduce the need to cleanup local partition state
> > > between rebalances. Do you have any thoughts on how the user would
> take
> > > advantage of this assignor in the consumer to do this? Maybe one
> approach
> > > is to delay cleanup until you detect a change from the previous
> > assignment
> > > in the onPartitionsAssigned() callback?
> > >
> > > Also, can you provide some detail on how the sticky assignor works at
> the
> > > group protocol level? For example, do you pass old assignments through
> > the
> > > "UserData" field in the consumer's JoinGroup?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have started a new KIP under
> > > >
> > > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > >
> > > > The corresponding JIRA is at
> > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > > >
> > > > Your feedback is much appreciated.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>
>
>
> --
> -- Guozhang
>
>
>
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Guozhang,

I was looking at the implementation of StreamsPartitionAssignor through 
its unit tests and expected to find some tests that
- verify stickiness by making at least two calls to the assign() method 
(so we check the second assign() call output preserves the assignments 
coming from the first assign() call output); or
- start off by a preset assignment, call assign() after some subscription 
change, and verify the previous assignment are preserved.
But none of the methods seem to do these. Did I overlook them, or 
stickiness is being tested in some other fashion?

Also, if there is a high-level write-up about how this assignor works 
could you please point me to it? Thanks.

Regards.
--Vahid
 



From:   Guozhang Wang <wa...@gmail.com>
To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
Date:   05/02/2016 10:34 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.


Guozhang

On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's 
different
> from our normal usage -- certainly you can use them to generate a diff, 
but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush 
but
> not a close() or something? You still need to flush any output and 
commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem 
sound.
> Passing previous assignment data via UserData also seems reasonable 
since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and 
onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after 
the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the 
old
> > assignments are preserved in the current implementation; except for 
when
> > the consumer group leader is killed; in which case, a fresh assignment 
is
> > performed. This is something that needs to be fixed. I tried to use 
your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint 
it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From:   Jason Gustafson <ja...@confluent.io>
> > To:     dev@kafka.apache.org
> > Date:   04/14/2016 11:37 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment 
Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having 
sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would 
take
> > advantage of this assignor in the consumer to do this? Maybe one 
approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at 
the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Guozhang Wang <wa...@gmail.com>.
Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
some sort of sticky partitioning mechanism. This is done through the
userData field though; i.e. all group members send their current "assigned
partitions" in their join group request, which will be grouped and send to
the leader, the leader then does best-effort for sticky-partitioning.


Guozhang

On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> I think I'm unclear how we leverage the
> onPartitionsRevoked/onPartitionsAssigned here in any way that's different
> from our normal usage -- certainly you can use them to generate a diff, but
> you still need to commit when partitions are revoked and that has a
> non-trivial cost. Are we just saying that you might be able to save some
> overhead, e.g. closing/reopening some other resources by doing a flush but
> not a close() or something? You still need to flush any output and commit
> offsets before returning from onPartitionsRevoked, right? Otherwise you
> couldn't guarantee clean handoff of partitions.
>
> In terms of the rebalancing, the basic requirements in the KIP seem sound.
> Passing previous assignment data via UserData also seems reasonable since
> it avoids redistributing all assignment data to all members and doesn't
> rely on the next generation leader being a member of the current
> generation. Hopefully this shouldn't be surprising since I think I
> discussed this w/ Jason before he updated the relevant wiki pages :)
>
> -Ewen
>
>
> On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > HI Jason,
> >
> > Thanks for your feedback.
> >
> > I believe your suggestion on how to take advantage of this assignor is
> > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> > callbacks and do a comparison of assigned partitions before and after the
> > re-balance and do the cleanup only if there is a change (e.g., if some
> > previously assigned partition is not in the assignment).
> >
> > On your second question, a number of tests that I ran shows that the old
> > assignments are preserved in the current implementation; except for when
> > the consumer group leader is killed; in which case, a fresh assignment is
> > performed. This is something that needs to be fixed. I tried to use your
> > pointers to find out where the best place is to preserve the old
> > assignment in such circumstances but have not been able to pinpoint it.
> If
> > you have any suggestion on this please share. Thanks.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
> >
> >
> > From:   Jason Gustafson <ja...@confluent.io>
> > To:     dev@kafka.apache.org
> > Date:   04/14/2016 11:37 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for the proposal. I think one of the advantages of having sticky
> > assignment would be reduce the need to cleanup local partition state
> > between rebalances. Do you have any thoughts on how the user would take
> > advantage of this assignor in the consumer to do this? Maybe one approach
> > is to delay cleanup until you detect a change from the previous
> assignment
> > in the onPartitionsAssigned() callback?
> >
> > Also, can you provide some detail on how the sticky assignor works at the
> > group protocol level? For example, do you pass old assignments through
> the
> > "UserData" field in the consumer's JoinGroup?
> >
> > Thanks,
> > Jason
> >
> > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Hi all,
> > >
> > > I have started a new KIP under
> > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > The corresponding JIRA is at
> > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> > >
> > > Your feedback is much appreciated.
> > >
> > > Regards,
> > > Vahid Hashemian
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Ewen,

Thank you for reviewing the KIP and providing feedback.

I believe the need to commit would still be there, as you mentioned. The 
main advantage, however, would be when dealing with local state based on 
partitions assigned, as described in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal 
or in the corresponding JIRA for this KIP. 

If consumers perform some processing on re-assignment of partitions (i.e. 
after a rebalance) it would be more efficient for them to stick to their 
assigned partitions and reduce the overhead of switching to a new set of 
partitions (you also referred to some use cases).

Unfortunately I don't have a specific use case in mind at the moment, but 
based on documents like above it seems that consumers can benefit from 
such a strategy. If you or others can think of specific use cases to 
enrich the KIP please let me know or directly update the KIP.

Regards,
Vahid Hashemian




From:   Ewen Cheslack-Postava <ew...@confluent.io>
To:     dev@kafka.apache.org
Date:   04/29/2016 09:48 PM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



I think I'm unclear how we leverage the
onPartitionsRevoked/onPartitionsAssigned here in any way that's different
from our normal usage -- certainly you can use them to generate a diff, 
but
you still need to commit when partitions are revoked and that has a
non-trivial cost. Are we just saying that you might be able to save some
overhead, e.g. closing/reopening some other resources by doing a flush but
not a close() or something? You still need to flush any output and commit
offsets before returning from onPartitionsRevoked, right? Otherwise you
couldn't guarantee clean handoff of partitions.

In terms of the rebalancing, the basic requirements in the KIP seem sound.
Passing previous assignment data via UserData also seems reasonable since
it avoids redistributing all assignment data to all members and doesn't
rely on the next generation leader being a member of the current
generation. Hopefully this shouldn't be surprising since I think I
discussed this w/ Jason before he updated the relevant wiki pages :)

-Ewen


On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> HI Jason,
>
> Thanks for your feedback.
>
> I believe your suggestion on how to take advantage of this assignor is
> valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> callbacks and do a comparison of assigned partitions before and after 
the
> re-balance and do the cleanup only if there is a change (e.g., if some
> previously assigned partition is not in the assignment).
>
> On your second question, a number of tests that I ran shows that the old
> assignments are preserved in the current implementation; except for when
> the consumer group leader is killed; in which case, a fresh assignment 
is
> performed. This is something that needs to be fixed. I tried to use your
> pointers to find out where the best place is to preserve the old
> assignment in such circumstances but have not been able to pinpoint it. 
If
> you have any suggestion on this please share. Thanks.
>
> Regards,
> Vahid Hashemian
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   04/14/2016 11:37 AM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment 
Strategy
>
>
>
> Hi Vahid,
>
> Thanks for the proposal. I think one of the advantages of having sticky
> assignment would be reduce the need to cleanup local partition state
> between rebalances. Do you have any thoughts on how the user would take
> advantage of this assignor in the consumer to do this? Maybe one 
approach
> is to delay cleanup until you detect a change from the previous 
assignment
> in the onPartitionsAssigned() callback?
>
> Also, can you provide some detail on how the sticky assignor works at 
the
> group protocol level? For example, do you pass old assignments through 
the
> "UserData" field in the consumer's JoinGroup?
>
> Thanks,
> Jason
>
> On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi all,
> >
> > I have started a new KIP under
> >
> >
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

>
> > The corresponding JIRA is at
> > https://issues.apache.org/jira/browse/KAFKA-2273
> > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> >
> > Your feedback is much appreciated.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
>
>
>
>
>


-- 
Thanks,
Ewen





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
I think I'm unclear how we leverage the
onPartitionsRevoked/onPartitionsAssigned here in any way that's different
from our normal usage -- certainly you can use them to generate a diff, but
you still need to commit when partitions are revoked and that has a
non-trivial cost. Are we just saying that you might be able to save some
overhead, e.g. closing/reopening some other resources by doing a flush but
not a close() or something? You still need to flush any output and commit
offsets before returning from onPartitionsRevoked, right? Otherwise you
couldn't guarantee clean handoff of partitions.

In terms of the rebalancing, the basic requirements in the KIP seem sound.
Passing previous assignment data via UserData also seems reasonable since
it avoids redistributing all assignment data to all members and doesn't
rely on the next generation leader being a member of the current
generation. Hopefully this shouldn't be surprising since I think I
discussed this w/ Jason before he updated the relevant wiki pages :)

-Ewen


On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> HI Jason,
>
> Thanks for your feedback.
>
> I believe your suggestion on how to take advantage of this assignor is
> valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned()
> callbacks and do a comparison of assigned partitions before and after the
> re-balance and do the cleanup only if there is a change (e.g., if some
> previously assigned partition is not in the assignment).
>
> On your second question, a number of tests that I ran shows that the old
> assignments are preserved in the current implementation; except for when
> the consumer group leader is killed; in which case, a fresh assignment is
> performed. This is something that needs to be fixed. I tried to use your
> pointers to find out where the best place is to preserve the old
> assignment in such circumstances but have not been able to pinpoint it. If
> you have any suggestion on this please share. Thanks.
>
> Regards,
> Vahid Hashemian
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   04/14/2016 11:37 AM
> Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>
>
>
> Hi Vahid,
>
> Thanks for the proposal. I think one of the advantages of having sticky
> assignment would be reduce the need to cleanup local partition state
> between rebalances. Do you have any thoughts on how the user would take
> advantage of this assignor in the consumer to do this? Maybe one approach
> is to delay cleanup until you detect a change from the previous assignment
> in the onPartitionsAssigned() callback?
>
> Also, can you provide some detail on how the sticky assignor works at the
> group protocol level? For example, do you pass old assignments through the
> "UserData" field in the consumer's JoinGroup?
>
> Thanks,
> Jason
>
> On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Hi all,
> >
> > I have started a new KIP under
> >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
>
> > The corresponding JIRA is at
> > https://issues.apache.org/jira/browse/KAFKA-2273
> > The corresponding PR is at https://github.com/apache/kafka/pull/1020
> >
> > Your feedback is much appreciated.
> >
> > Regards,
> > Vahid Hashemian
> >
> >
>
>
>
>
>


-- 
Thanks,
Ewen

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Vahid S Hashemian <va...@us.ibm.com>.
HI Jason,

Thanks for your feedback.

I believe your suggestion on how to take advantage of this assignor is 
valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() 
callbacks and do a comparison of assigned partitions before and after the 
re-balance and do the cleanup only if there is a change (e.g., if some 
previously assigned partition is not in the assignment).

On your second question, a number of tests that I ran shows that the old 
assignments are preserved in the current implementation; except for when 
the consumer group leader is killed; in which case, a fresh assignment is 
performed. This is something that needs to be fixed. I tried to use your 
pointers to find out where the best place is to preserve the old 
assignment in such circumstances but have not been able to pinpoint it. If 
you have any suggestion on this please share. Thanks.

Regards,
Vahid Hashemian




From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   04/14/2016 11:37 AM
Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hi Vahid,

Thanks for the proposal. I think one of the advantages of having sticky
assignment would be reduce the need to cleanup local partition state
between rebalances. Do you have any thoughts on how the user would take
advantage of this assignor in the consumer to do this? Maybe one approach
is to delay cleanup until you detect a change from the previous assignment
in the onPartitionsAssigned() callback?

Also, can you provide some detail on how the sticky assignor works at the
group protocol level? For example, do you pass old assignments through the
"UserData" field in the consumer's JoinGroup?

Thanks,
Jason

On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi all,
>
> I have started a new KIP under
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

> The corresponding JIRA is at
> https://issues.apache.org/jira/browse/KAFKA-2273
> The corresponding PR is at https://github.com/apache/kafka/pull/1020
>
> Your feedback is much appreciated.
>
> Regards,
> Vahid Hashemian
>
>





Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Vahid,

Thanks for the proposal. I think one of the advantages of having sticky
assignment would be reduce the need to cleanup local partition state
between rebalances. Do you have any thoughts on how the user would take
advantage of this assignor in the consumer to do this? Maybe one approach
is to delay cleanup until you detect a change from the previous assignment
in the onPartitionsAssigned() callback?

Also, can you provide some detail on how the sticky assignor works at the
group protocol level? For example, do you pass old assignments through the
"UserData" field in the consumer's JoinGroup?

Thanks,
Jason

On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi all,
>
> I have started a new KIP under
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> The corresponding JIRA is at
> https://issues.apache.org/jira/browse/KAFKA-2273
> The corresponding PR is at https://github.com/apache/kafka/pull/1020
>
> Your feedback is much appreciated.
>
> Regards,
> Vahid Hashemian
>
>