You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stevo Slavić <ss...@gmail.com> on 2015/07/21 11:38:51 UTC

New consumer - poll/seek javadoc confusing, need clarification

Hello Apache Kafka community,

I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
I'm not sure what the outcome will be, what is expected in following
scenario:

- kafkaConsumer is instantiated with auto-commit off
- kafkaConsumer.subscribe(someTopic)
- kafkaConsumer.position is called for every TopicPartition HLC is actively
subscribed on

and then when doing multiple poll calls in succession (without calling
commit), does seek have to be called in between poll calls to position HLC
to skip what was read in previous poll, or does HLC keep that state
(position after poll) in memory, so that next poll (without seek in between
two poll calls) will continue from where last poll stopped?

Could be it's just me not understanding this from javadoc. If not, maybe
javadoc can be improved to make this (even) more obvious.

Kind regards,
Stevo Slavic.

Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Stevo,

Thanks for the early testing on the new consumer! This might be a bug. I
wonder if it could also be explained by partition rebalancing. In the
current implementation, a rebalance will clear the old positions (including
those that were seeked to). I think it's debatable whether this behavior is
useful, but it may explain what you're seeing.

-Jason

On Thu, Jul 23, 2015 at 2:10 AM, Stevo Slavić <ss...@gmail.com> wrote:

> Strange, if after seek I make several poll requests, eventually it will
> read/return messages from offset that seek set.
>
> On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ss...@gmail.com> wrote:
>
> > Thanks Ewen for heads up.
> >
> > It's great that seek is not needed in between poll when business goes as
> > usual.
> >
> > In edge case, when my logic detects it needs to go back and reread events
> > from given position in history, I use seek. I found out that next poll
> > after seek will not respect offset used in seek. It is strange that event
> > Consumer.position returns same offset that seek has set for the consumer
> > instance, but poll still does not return messages starting from that
> offset.
> >
> > E.g. say there are 5 messages published to a single partition of a single
> > topic. Consumer subscribes to that topic partition, with
> smallest/earliest
> > offset reset strategy configured, and consumer.position confirms that the
> > consumer is at position 0.
> > Then poll is issued and it returns all 5 messages. Logic processes
> > messages, detects it needs to go back in history to position 0, it does
> not
> > commit messages but calls seek to 0, position confirms consumer is at
> > offset 0. Next poll does not return any messages.
> >
> > So seek is not really working what it should do, according to its
> javadoc:
> >
> > /**
> >  * Overrides the fetch offsets that the consumer will use on the next
> > {@link #poll(long) poll(timeout)}. If this API
> >  * is invoked for the same partition more than once, the latest offset
> > will be used on the next poll(). Note that
> >  * you may lose data if this API is arbitrarily used in the middle of
> > consumption, to reset the fetch offsets
> >  */
> >
> > I've checked also, calling seek multiple times does not help to get poll
> > to use offset set with last seek.
> > Could be something is wrong with poll implementation, making it not
> > respect offset set with seek.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> >
> > On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> >> It should just continue consuming using the existing offsets. It'll have
> >> to
> >> refresh metadata to pick up the leadership change, but once it does it
> can
> >> just pick up where consumption from the previous leader stopped -- all
> the
> >> ISRs should have the same data, so the new leader will have all the same
> >> data the previous leader had (assuming unclean leader election is not
> >> enabled).
> >>
> >> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jc...@tivo.com> wrote:
> >>
> >> >
> >> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <
> ewen@confluent.io
> >> >
> >> > wrote:
> >>
> >> > >
> >> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com>
> >> wrote:
> >> > >
> >> > >> Hello Apache Kafka community,
> >> > >>
> >> > >> I find new consumer poll/seek javadoc a bit confusing. Just by
> >> reading
> >> > docs
> >> > >> I'm not sure what the outcome will be, what is expected in
> following
> >> > >> scenario:
> >> > >>
> >> > >> - kafkaConsumer is instantiated with auto-commit off
> >> > >> - kafkaConsumer.subscribe(someTopic)
> >> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
> >> > actively
> >> > >> subscribed on
> >> > >>
> >> > >> and then when doing multiple poll calls in succession (without
> >> calling
> >> > >> commit), does seek have to be called in between poll calls to
> >> position
> >> > HLC
> >> > >> to skip what was read in previous poll, or does HLC keep that state
> >> > >> (position after poll) in memory, so that next poll (without seek in
> >> > between
> >> > >> two poll calls) will continue from where last poll stopped?
> >> > >>
> >> > >
> >> > > The position is tracked in-memory within the consumer, so as long as
> >> > there
> >> > > isn't a consumer rebalance, consumption will just proceed with
> >> subsequent
> >> > > messages (i.e. the behavior I think most people would find
> intuitive).
> >> > > However, if a rebalance occurs (another consumer instance joins the
> >> group
> >> > > or some leave), then a partition may be assigned to an different
> >> consumer
> >> > > instance that has no idea about the current position and will
> restart
> >> > based
> >> > > on the offset reset setting (because attempting to fetch the
> committed
> >> > > offset will fail since no offsets have been committed).
> >> > >
> >> >
> >> > Ewen,
> >> >
> >> > What happens if there is a broker failure and a new broker becomes the
> >> > partition leader? Does the high level consumer start listening to the
> >> new
> >> > partition leader at the in-memory position, or does it restart based
> on
> >> > saved offsets?
> >> >
> >> > Thanks,
> >> > -James
> >> >
> >> > > -Ewen
> >> > >
> >> > >
> >> > >> Could be it's just me not understanding this from javadoc. If not,
> >> maybe
> >> > >> javadoc can be improved to make this (even) more obvious.
> >> > >>
> >> > >> Kind regards,
> >> > >> Stevo Slavic.
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > Thanks,
> >> > > Ewen
> >> >
> >> >
> >>
> >>
> >> --
> >> Thanks,
> >> Ewen
> >>
> >
> >
>

Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by Stevo Slavić <ss...@gmail.com>.
Strange, if after seek I make several poll requests, eventually it will
read/return messages from offset that seek set.

On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ss...@gmail.com> wrote:

> Thanks Ewen for heads up.
>
> It's great that seek is not needed in between poll when business goes as
> usual.
>
> In edge case, when my logic detects it needs to go back and reread events
> from given position in history, I use seek. I found out that next poll
> after seek will not respect offset used in seek. It is strange that event
> Consumer.position returns same offset that seek has set for the consumer
> instance, but poll still does not return messages starting from that offset.
>
> E.g. say there are 5 messages published to a single partition of a single
> topic. Consumer subscribes to that topic partition, with smallest/earliest
> offset reset strategy configured, and consumer.position confirms that the
> consumer is at position 0.
> Then poll is issued and it returns all 5 messages. Logic processes
> messages, detects it needs to go back in history to position 0, it does not
> commit messages but calls seek to 0, position confirms consumer is at
> offset 0. Next poll does not return any messages.
>
> So seek is not really working what it should do, according to its javadoc:
>
> /**
>  * Overrides the fetch offsets that the consumer will use on the next
> {@link #poll(long) poll(timeout)}. If this API
>  * is invoked for the same partition more than once, the latest offset
> will be used on the next poll(). Note that
>  * you may lose data if this API is arbitrarily used in the middle of
> consumption, to reset the fetch offsets
>  */
>
> I've checked also, calling seek multiple times does not help to get poll
> to use offset set with last seek.
> Could be something is wrong with poll implementation, making it not
> respect offset set with seek.
>
> Kind regards,
> Stevo Slavic.
>
>
> On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
>> It should just continue consuming using the existing offsets. It'll have
>> to
>> refresh metadata to pick up the leadership change, but once it does it can
>> just pick up where consumption from the previous leader stopped -- all the
>> ISRs should have the same data, so the new leader will have all the same
>> data the previous leader had (assuming unclean leader election is not
>> enabled).
>>
>> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jc...@tivo.com> wrote:
>>
>> >
>> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <ewen@confluent.io
>> >
>> > wrote:
>>
>> > >
>> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com>
>> wrote:
>> > >
>> > >> Hello Apache Kafka community,
>> > >>
>> > >> I find new consumer poll/seek javadoc a bit confusing. Just by
>> reading
>> > docs
>> > >> I'm not sure what the outcome will be, what is expected in following
>> > >> scenario:
>> > >>
>> > >> - kafkaConsumer is instantiated with auto-commit off
>> > >> - kafkaConsumer.subscribe(someTopic)
>> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
>> > actively
>> > >> subscribed on
>> > >>
>> > >> and then when doing multiple poll calls in succession (without
>> calling
>> > >> commit), does seek have to be called in between poll calls to
>> position
>> > HLC
>> > >> to skip what was read in previous poll, or does HLC keep that state
>> > >> (position after poll) in memory, so that next poll (without seek in
>> > between
>> > >> two poll calls) will continue from where last poll stopped?
>> > >>
>> > >
>> > > The position is tracked in-memory within the consumer, so as long as
>> > there
>> > > isn't a consumer rebalance, consumption will just proceed with
>> subsequent
>> > > messages (i.e. the behavior I think most people would find intuitive).
>> > > However, if a rebalance occurs (another consumer instance joins the
>> group
>> > > or some leave), then a partition may be assigned to an different
>> consumer
>> > > instance that has no idea about the current position and will restart
>> > based
>> > > on the offset reset setting (because attempting to fetch the committed
>> > > offset will fail since no offsets have been committed).
>> > >
>> >
>> > Ewen,
>> >
>> > What happens if there is a broker failure and a new broker becomes the
>> > partition leader? Does the high level consumer start listening to the
>> new
>> > partition leader at the in-memory position, or does it restart based on
>> > saved offsets?
>> >
>> > Thanks,
>> > -James
>> >
>> > > -Ewen
>> > >
>> > >
>> > >> Could be it's just me not understanding this from javadoc. If not,
>> maybe
>> > >> javadoc can be improved to make this (even) more obvious.
>> > >>
>> > >> Kind regards,
>> > >> Stevo Slavic.
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Thanks,
>> > > Ewen
>> >
>> >
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>

Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by Stevo Slavić <ss...@gmail.com>.
Thanks Ewen for heads up.

It's great that seek is not needed in between poll when business goes as
usual.

In edge case, when my logic detects it needs to go back and reread events
from given position in history, I use seek. I found out that next poll
after seek will not respect offset used in seek. It is strange that event
Consumer.position returns same offset that seek has set for the consumer
instance, but poll still does not return messages starting from that offset.

E.g. say there are 5 messages published to a single partition of a single
topic. Consumer subscribes to that topic partition, with smallest/earliest
offset reset strategy configured, and consumer.position confirms that the
consumer is at position 0.
Then poll is issued and it returns all 5 messages. Logic processes
messages, detects it needs to go back in history to position 0, it does not
commit messages but calls seek to 0, position confirms consumer is at
offset 0. Next poll does not return any messages.

So seek is not really working what it should do, according to its javadoc:

/**
 * Overrides the fetch offsets that the consumer will use on the next
{@link #poll(long) poll(timeout)}. If this API
 * is invoked for the same partition more than once, the latest offset will
be used on the next poll(). Note that
 * you may lose data if this API is arbitrarily used in the middle of
consumption, to reset the fetch offsets
 */

I've checked also, calling seek multiple times does not help to get poll to
use offset set with last seek.
Could be something is wrong with poll implementation, making it not respect
offset set with seek.

Kind regards,
Stevo Slavic.


On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> It should just continue consuming using the existing offsets. It'll have to
> refresh metadata to pick up the leadership change, but once it does it can
> just pick up where consumption from the previous leader stopped -- all the
> ISRs should have the same data, so the new leader will have all the same
> data the previous leader had (assuming unclean leader election is not
> enabled).
>
> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jc...@tivo.com> wrote:
>
> >
> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <ew...@confluent.io>
> > wrote:
> > >
> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com>
> wrote:
> > >
> > >> Hello Apache Kafka community,
> > >>
> > >> I find new consumer poll/seek javadoc a bit confusing. Just by reading
> > docs
> > >> I'm not sure what the outcome will be, what is expected in following
> > >> scenario:
> > >>
> > >> - kafkaConsumer is instantiated with auto-commit off
> > >> - kafkaConsumer.subscribe(someTopic)
> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
> > actively
> > >> subscribed on
> > >>
> > >> and then when doing multiple poll calls in succession (without calling
> > >> commit), does seek have to be called in between poll calls to position
> > HLC
> > >> to skip what was read in previous poll, or does HLC keep that state
> > >> (position after poll) in memory, so that next poll (without seek in
> > between
> > >> two poll calls) will continue from where last poll stopped?
> > >>
> > >
> > > The position is tracked in-memory within the consumer, so as long as
> > there
> > > isn't a consumer rebalance, consumption will just proceed with
> subsequent
> > > messages (i.e. the behavior I think most people would find intuitive).
> > > However, if a rebalance occurs (another consumer instance joins the
> group
> > > or some leave), then a partition may be assigned to an different
> consumer
> > > instance that has no idea about the current position and will restart
> > based
> > > on the offset reset setting (because attempting to fetch the committed
> > > offset will fail since no offsets have been committed).
> > >
> >
> > Ewen,
> >
> > What happens if there is a broker failure and a new broker becomes the
> > partition leader? Does the high level consumer start listening to the new
> > partition leader at the in-memory position, or does it restart based on
> > saved offsets?
> >
> > Thanks,
> > -James
> >
> > > -Ewen
> > >
> > >
> > >> Could be it's just me not understanding this from javadoc. If not,
> maybe
> > >> javadoc can be improved to make this (even) more obvious.
> > >>
> > >> Kind regards,
> > >> Stevo Slavic.
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> >
> >
>
>
> --
> Thanks,
> Ewen
>

Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
It should just continue consuming using the existing offsets. It'll have to
refresh metadata to pick up the leadership change, but once it does it can
just pick up where consumption from the previous leader stopped -- all the
ISRs should have the same data, so the new leader will have all the same
data the previous leader had (assuming unclean leader election is not
enabled).

On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jc...@tivo.com> wrote:

>
> > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
> >
> > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com> wrote:
> >
> >> Hello Apache Kafka community,
> >>
> >> I find new consumer poll/seek javadoc a bit confusing. Just by reading
> docs
> >> I'm not sure what the outcome will be, what is expected in following
> >> scenario:
> >>
> >> - kafkaConsumer is instantiated with auto-commit off
> >> - kafkaConsumer.subscribe(someTopic)
> >> - kafkaConsumer.position is called for every TopicPartition HLC is
> actively
> >> subscribed on
> >>
> >> and then when doing multiple poll calls in succession (without calling
> >> commit), does seek have to be called in between poll calls to position
> HLC
> >> to skip what was read in previous poll, or does HLC keep that state
> >> (position after poll) in memory, so that next poll (without seek in
> between
> >> two poll calls) will continue from where last poll stopped?
> >>
> >
> > The position is tracked in-memory within the consumer, so as long as
> there
> > isn't a consumer rebalance, consumption will just proceed with subsequent
> > messages (i.e. the behavior I think most people would find intuitive).
> > However, if a rebalance occurs (another consumer instance joins the group
> > or some leave), then a partition may be assigned to an different consumer
> > instance that has no idea about the current position and will restart
> based
> > on the offset reset setting (because attempting to fetch the committed
> > offset will fail since no offsets have been committed).
> >
>
> Ewen,
>
> What happens if there is a broker failure and a new broker becomes the
> partition leader? Does the high level consumer start listening to the new
> partition leader at the in-memory position, or does it restart based on
> saved offsets?
>
> Thanks,
> -James
>
> > -Ewen
> >
> >
> >> Could be it's just me not understanding this from javadoc. If not, maybe
> >> javadoc can be improved to make this (even) more obvious.
> >>
> >> Kind regards,
> >> Stevo Slavic.
> >>
> >
> >
> >
> > --
> > Thanks,
> > Ewen
>
>


-- 
Thanks,
Ewen

Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by James Cheng <jc...@tivo.com>.
> On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
> 
> On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com> wrote:
> 
>> Hello Apache Kafka community,
>> 
>> I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
>> I'm not sure what the outcome will be, what is expected in following
>> scenario:
>> 
>> - kafkaConsumer is instantiated with auto-commit off
>> - kafkaConsumer.subscribe(someTopic)
>> - kafkaConsumer.position is called for every TopicPartition HLC is actively
>> subscribed on
>> 
>> and then when doing multiple poll calls in succession (without calling
>> commit), does seek have to be called in between poll calls to position HLC
>> to skip what was read in previous poll, or does HLC keep that state
>> (position after poll) in memory, so that next poll (without seek in between
>> two poll calls) will continue from where last poll stopped?
>> 
> 
> The position is tracked in-memory within the consumer, so as long as there
> isn't a consumer rebalance, consumption will just proceed with subsequent
> messages (i.e. the behavior I think most people would find intuitive).
> However, if a rebalance occurs (another consumer instance joins the group
> or some leave), then a partition may be assigned to an different consumer
> instance that has no idea about the current position and will restart based
> on the offset reset setting (because attempting to fetch the committed
> offset will fail since no offsets have been committed).
> 

Ewen,

What happens if there is a broker failure and a new broker becomes the partition leader? Does the high level consumer start listening to the new partition leader at the in-memory position, or does it restart based on saved offsets?

Thanks,
-James

> -Ewen
> 
> 
>> Could be it's just me not understanding this from javadoc. If not, maybe
>> javadoc can be improved to make this (even) more obvious.
>> 
>> Kind regards,
>> Stevo Slavic.
>> 
> 
> 
> 
> -- 
> Thanks,
> Ewen


Re: New consumer - poll/seek javadoc confusing, need clarification

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ss...@gmail.com> wrote:

> Hello Apache Kafka community,
>
> I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
> I'm not sure what the outcome will be, what is expected in following
> scenario:
>
> - kafkaConsumer is instantiated with auto-commit off
> - kafkaConsumer.subscribe(someTopic)
> - kafkaConsumer.position is called for every TopicPartition HLC is actively
> subscribed on
>
> and then when doing multiple poll calls in succession (without calling
> commit), does seek have to be called in between poll calls to position HLC
> to skip what was read in previous poll, or does HLC keep that state
> (position after poll) in memory, so that next poll (without seek in between
> two poll calls) will continue from where last poll stopped?
>

The position is tracked in-memory within the consumer, so as long as there
isn't a consumer rebalance, consumption will just proceed with subsequent
messages (i.e. the behavior I think most people would find intuitive).
However, if a rebalance occurs (another consumer instance joins the group
or some leave), then a partition may be assigned to an different consumer
instance that has no idea about the current position and will restart based
on the offset reset setting (because attempting to fetch the committed
offset will fail since no offsets have been committed).

-Ewen


> Could be it's just me not understanding this from javadoc. If not, maybe
> javadoc can be improved to make this (even) more obvious.
>
> Kind regards,
> Stevo Slavic.
>



-- 
Thanks,
Ewen