You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Romianowski <ho...@googlemail.com> on 2012/05/02 00:28:12 UTC

Do partitions stick to a particular consumer?

Hi,

wer are using the high-level Java consumer. We feed the events received
from Kafka into some sort of state machine. Since we have Kafka's guarantee
that each partition is read by the same consumer, we want to keep the
states in memory to achieve even higher throughput. So it is vital for us
that either a partition is never moved from a running consumer or our code
gets at least informed about that.

Does Kafka guarantee that a partition assigned to a consumer will stay at
this consumer for the whole lifetime of the jvm? Even in corner cases like
loosing connection to zookeeper?

Regards,

Peter

Re: Do partitions stick to a particular consumer?

Posted by Peter Romianowski <ho...@googlemail.com>.
Hi,

I created https://issues.apache.org/jira/browse/KAFKA-345 and added a patch.

Patch has been applied to our copy of kafka:
https://github.com/optivo-org/kafka/commit/c4b2647101ab857dda4cb831863dd37e5cb4df55

Greetings

Peter

2012/5/2 Jay Kreps <ja...@gmail.com>:
> Yeah, this is a common need that we don't really expose. It would be
> great to scope out an API or appropriate plugin mechanism for this.
>
> -Jay
>
> On Wed, May 2, 2012 at 1:55 AM, Peter Romianowski
> <ho...@googlemail.com> wrote:
>> Hi,
>>
>> thanks for the quick answers. I took a look at the source code (I should
>> have done it before asking this question) and it was quite obvious, that
>> partitions might get moved.
>>
>> In our scenario we partition events by userid and then apply these to some
>> kind of state machine, that modifies the actual state of a user. So events
>> trigger state transitions. In order to avoid the need of loading user's
>> state upon each event processed, we cache that. But if a user's partition
>> is moved to another consumer and then back to the previous consumer we have
>> stale caches and hell breaks loose. I guess the same kind of problem occurs
>> in other scenarios like counting numbers by user, too.
>>
>> We would like to stick to the high level consumer and it's rebalancing.
>>
>> All that's needed to fullfill our use case is some kind of listener I could
>> add to the consumer that gets notified whenever rebalancing happens. I
>> would not even need to know which partitions got moved away and which got
>> assigned, since I would simply flush the whole cache. During normal
>> operation rebalancing should only happen, if infrastructure like brokers,
>> number of partitions or consumer changes, right?
>>
>> I guess I'll take a deeper dive into the source code. Any implementation
>> hints?
>>
>> Thanks again
>>
>> Peter
>> Am 02.05.2012 03:23 schrieb "Jun Rao" <ju...@gmail.com>:
>>
>>> Peter,
>>>
>>> Currently the partition assignment could change when there is any change in
>>> brokers and consumers in the same group. The change is typically triggered
>>> by starting or stopping a broker/consumer. However, it can also happen if
>>> the broker/consumer somehow expires its ZK session (e.g., long GC).
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
>>> <ho...@googlemail.com>wrote:
>>>
>>> > Hi,
>>> >
>>> > wer are using the high-level Java consumer. We feed the events received
>>> > from Kafka into some sort of state machine. Since we have Kafka's
>>> guarantee
>>> > that each partition is read by the same consumer, we want to keep the
>>> > states in memory to achieve even higher throughput. So it is vital for us
>>> > that either a partition is never moved from a running consumer or our
>>> code
>>> > gets at least informed about that.
>>> >
>>> > Does Kafka guarantee that a partition assigned to a consumer will stay at
>>> > this consumer for the whole lifetime of the jvm? Even in corner cases
>>> like
>>> > loosing connection to zookeeper?
>>> >
>>> > Regards,
>>> >
>>> > Peter
>>> >
>>>



-- 
---
404 Signature Not Found

Re: Do partitions stick to a particular consumer?

Posted by Jay Kreps <ja...@gmail.com>.
Yeah, this is a common need that we don't really expose. It would be
great to scope out an API or appropriate plugin mechanism for this.

-Jay

On Wed, May 2, 2012 at 1:55 AM, Peter Romianowski
<ho...@googlemail.com> wrote:
> Hi,
>
> thanks for the quick answers. I took a look at the source code (I should
> have done it before asking this question) and it was quite obvious, that
> partitions might get moved.
>
> In our scenario we partition events by userid and then apply these to some
> kind of state machine, that modifies the actual state of a user. So events
> trigger state transitions. In order to avoid the need of loading user's
> state upon each event processed, we cache that. But if a user's partition
> is moved to another consumer and then back to the previous consumer we have
> stale caches and hell breaks loose. I guess the same kind of problem occurs
> in other scenarios like counting numbers by user, too.
>
> We would like to stick to the high level consumer and it's rebalancing.
>
> All that's needed to fullfill our use case is some kind of listener I could
> add to the consumer that gets notified whenever rebalancing happens. I
> would not even need to know which partitions got moved away and which got
> assigned, since I would simply flush the whole cache. During normal
> operation rebalancing should only happen, if infrastructure like brokers,
> number of partitions or consumer changes, right?
>
> I guess I'll take a deeper dive into the source code. Any implementation
> hints?
>
> Thanks again
>
> Peter
> Am 02.05.2012 03:23 schrieb "Jun Rao" <ju...@gmail.com>:
>
>> Peter,
>>
>> Currently the partition assignment could change when there is any change in
>> brokers and consumers in the same group. The change is typically triggered
>> by starting or stopping a broker/consumer. However, it can also happen if
>> the broker/consumer somehow expires its ZK session (e.g., long GC).
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
>> <ho...@googlemail.com>wrote:
>>
>> > Hi,
>> >
>> > wer are using the high-level Java consumer. We feed the events received
>> > from Kafka into some sort of state machine. Since we have Kafka's
>> guarantee
>> > that each partition is read by the same consumer, we want to keep the
>> > states in memory to achieve even higher throughput. So it is vital for us
>> > that either a partition is never moved from a running consumer or our
>> code
>> > gets at least informed about that.
>> >
>> > Does Kafka guarantee that a partition assigned to a consumer will stay at
>> > this consumer for the whole lifetime of the jvm? Even in corner cases
>> like
>> > loosing connection to zookeeper?
>> >
>> > Regards,
>> >
>> > Peter
>> >
>>

Re: Do partitions stick to a particular consumer?

Posted by Neha Narkhede <ne...@gmail.com>.
Peter,

Thanks for explaining your use case. It makes sense why you would need a
listener to get fired during the rebalancing operation. There are a couple
of ways of doing this in the consumer. Would you mind filing a JIRA so we
can discuss requirements and a possible fix ?

>> During normal operation rebalancing should only happen, if
infrastructure like brokers, number of partitions or consumer changes,
right?

That's right. However, note that, during a rebalancing operation, we don't
completely re-shuffle the partitions amongst the consumers. Most partitions
might not even move from a consumer.

>> I guess I'll take a deeper dive into the source code. Any implementation
hints?

The class to look at is ZookeeperConsumerConnector and the API that does
the rebalancing is syncedReblance().

Thanks,
Neha


On Wed, May 2, 2012 at 1:55 AM, Peter Romianowski
<ho...@googlemail.com>wrote:

> Hi,
>
> thanks for the quick answers. I took a look at the source code (I should
> have done it before asking this question) and it was quite obvious, that
> partitions might get moved.
>
> In our scenario we partition events by userid and then apply these to some
> kind of state machine, that modifies the actual state of a user. So events
> trigger state transitions. In order to avoid the need of loading user's
> state upon each event processed, we cache that. But if a user's partition
> is moved to another consumer and then back to the previous consumer we have
> stale caches and hell breaks loose. I guess the same kind of problem occurs
> in other scenarios like counting numbers by user, too.
>
> We would like to stick to the high level consumer and it's rebalancing.
>
> All that's needed to fullfill our use case is some kind of listener I could
> add to the consumer that gets notified whenever rebalancing happens. I
> would not even need to know which partitions got moved away and which got
> assigned, since I would simply flush the whole cache. During normal
> operation rebalancing should only happen, if infrastructure like brokers,
> number of partitions or consumer changes, right?
>
> I guess I'll take a deeper dive into the source code. Any implementation
> hints?
>
> Thanks again
>
> Peter
> Am 02.05.2012 03:23 schrieb "Jun Rao" <ju...@gmail.com>:
>
> > Peter,
> >
> > Currently the partition assignment could change when there is any change
> in
> > brokers and consumers in the same group. The change is typically
> triggered
> > by starting or stopping a broker/consumer. However, it can also happen if
> > the broker/consumer somehow expires its ZK session (e.g., long GC).
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
> > <ho...@googlemail.com>wrote:
> >
> > > Hi,
> > >
> > > wer are using the high-level Java consumer. We feed the events received
> > > from Kafka into some sort of state machine. Since we have Kafka's
> > guarantee
> > > that each partition is read by the same consumer, we want to keep the
> > > states in memory to achieve even higher throughput. So it is vital for
> us
> > > that either a partition is never moved from a running consumer or our
> > code
> > > gets at least informed about that.
> > >
> > > Does Kafka guarantee that a partition assigned to a consumer will stay
> at
> > > this consumer for the whole lifetime of the jvm? Even in corner cases
> > like
> > > loosing connection to zookeeper?
> > >
> > > Regards,
> > >
> > > Peter
> > >
> >
>

Re: Do partitions stick to a particular consumer?

Posted by Peter Romianowski <ho...@googlemail.com>.
Hi,

thanks for the quick answers. I took a look at the source code (I should
have done it before asking this question) and it was quite obvious, that
partitions might get moved.

In our scenario we partition events by userid and then apply these to some
kind of state machine, that modifies the actual state of a user. So events
trigger state transitions. In order to avoid the need of loading user's
state upon each event processed, we cache that. But if a user's partition
is moved to another consumer and then back to the previous consumer we have
stale caches and hell breaks loose. I guess the same kind of problem occurs
in other scenarios like counting numbers by user, too.

We would like to stick to the high level consumer and it's rebalancing.

All that's needed to fullfill our use case is some kind of listener I could
add to the consumer that gets notified whenever rebalancing happens. I
would not even need to know which partitions got moved away and which got
assigned, since I would simply flush the whole cache. During normal
operation rebalancing should only happen, if infrastructure like brokers,
number of partitions or consumer changes, right?

I guess I'll take a deeper dive into the source code. Any implementation
hints?

Thanks again

Peter
Am 02.05.2012 03:23 schrieb "Jun Rao" <ju...@gmail.com>:

> Peter,
>
> Currently the partition assignment could change when there is any change in
> brokers and consumers in the same group. The change is typically triggered
> by starting or stopping a broker/consumer. However, it can also happen if
> the broker/consumer somehow expires its ZK session (e.g., long GC).
>
> Thanks,
>
> Jun
>
> On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
> <ho...@googlemail.com>wrote:
>
> > Hi,
> >
> > wer are using the high-level Java consumer. We feed the events received
> > from Kafka into some sort of state machine. Since we have Kafka's
> guarantee
> > that each partition is read by the same consumer, we want to keep the
> > states in memory to achieve even higher throughput. So it is vital for us
> > that either a partition is never moved from a running consumer or our
> code
> > gets at least informed about that.
> >
> > Does Kafka guarantee that a partition assigned to a consumer will stay at
> > this consumer for the whole lifetime of the jvm? Even in corner cases
> like
> > loosing connection to zookeeper?
> >
> > Regards,
> >
> > Peter
> >
>

Re: Do partitions stick to a particular consumer?

Posted by Jun Rao <ju...@gmail.com>.
Peter,

Currently the partition assignment could change when there is any change in
brokers and consumers in the same group. The change is typically triggered
by starting or stopping a broker/consumer. However, it can also happen if
the broker/consumer somehow expires its ZK session (e.g., long GC).

Thanks,

Jun

On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
<ho...@googlemail.com>wrote:

> Hi,
>
> wer are using the high-level Java consumer. We feed the events received
> from Kafka into some sort of state machine. Since we have Kafka's guarantee
> that each partition is read by the same consumer, we want to keep the
> states in memory to achieve even higher throughput. So it is vital for us
> that either a partition is never moved from a running consumer or our code
> gets at least informed about that.
>
> Does Kafka guarantee that a partition assigned to a consumer will stay at
> this consumer for the whole lifetime of the jvm? Even in corner cases like
> loosing connection to zookeeper?
>
> Regards,
>
> Peter
>

Re: Do partitions stick to a particular consumer?

Posted by Neha Narkhede <ne...@gmail.com>.
>> Does Kafka guarantee that a partition assigned to a consumer will stay at
this consumer for the whole lifetime of the jvm? Even in corner cases like
loosing connection to zookeeper?

No, it doesn't. In fact, the guarantee is that, the partition will be
assigned for consumption to another consumer if the previous one dies or
loses its zookeeper session.
The goal is to let the consumption of the topic, as a whole, continue
smoothly in the presence of individual consumer failures.

However, I'd like to understand your use case better. It seems that you
actually don't need any automatic rebalancing behavior in the consumer. Is
there a reason why you are not using SimpleConsumer instead ?

Thanks,
Neha


On Tue, May 1, 2012 at 3:28 PM, Peter Romianowski
<ho...@googlemail.com>wrote:

> Hi,
>
> wer are using the high-level Java consumer. We feed the events received
> from Kafka into some sort of state machine. Since we have Kafka's guarantee
> that each partition is read by the same consumer, we want to keep the
> states in memory to achieve even higher throughput. So it is vital for us
> that either a partition is never moved from a running consumer or our code
> gets at least informed about that.
>
> Does Kafka guarantee that a partition assigned to a consumer will stay at
> this consumer for the whole lifetime of the jvm? Even in corner cases like
> loosing connection to zookeeper?
>
> Regards,
>
> Peter
>