You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Péricé Robin <pe...@gmail.com> on 2016/02/17 18:13:12 UTC

Consumer seek on 0.9.0 API

Hi,

I'm trying to use the new Consumer API with this example :
https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

With a Producer I sent 1000 messages to my Kafka broker. I need to know if
it's possible, for example, to read message from offset 500 to 1000.

What I did :


   -         consumer.seek(new TopicPartition("topic1", 0), 500);


   -         final ConsumerRecords<Integer, String> records =
   consumer.poll(1000);


But this didn't nothing (when I don't use seek() method I consume all the
messages without any problems).

Any help on this will be greatly appreciated !

Regards,

Robin

Re: Consumer seek on 0.9.0 API

Posted by Alex Loddengaard <al...@confluent.io>.
Hi Robin,

Glad it's working. I'll explain:

When a consumer subscribes to one or many topics using subscribe(), the
consumer group coordinator is responsible for assigning partitions to each
consumer in the consumer group, to ensure all messages in the topic are
being consumed. The coordinator handles failover and expanding/shrinking
the number of consumers.

When you called seek() without calling poll(), the consumer hadn't received
its partition assignments yet, and hence the seek() call didn't behave as
expected. However, once you called poll(), the partition assignments were
made, and seek() behaved as expected.

An alternative to using subscribe() is to use assign(). However, doing so
bypasses the consumer group coordinator, which means you'll lose the nice
failover and expanding/shrinking features and have to handle these cases
yourself (which is cumbersome).

Does that make sense?

Alex

On Thu, Feb 18, 2016 at 1:14 AM, Péricé Robin <pe...@gmail.com>
wrote:

> Hi,
>
> Ok I did a poll() before my seek() and poll() again and now my consumer
> starts at offset.
>
> Thanks you a lot ! But I don't really understand why I have to do that, if
> anyone can explain me.
>
> Regards,
>
> Robin
>
> 2016-02-17 20:39 GMT+01:00 Alex Loddengaard <al...@confluent.io>:
>
> > Hi Robin,
> >
> > I believe seek() needs to be called after the consumer gets its partition
> > assignments. Try calling poll() before you call seek(), then poll() again
> > and process the records from the latter poll().
> >
> > There may be a better way to do this -- let's see if anyone else has a
> > suggestion.
> >
> > Alex
> >
> > On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I'm trying to use the new Consumer API with this example :
> > >
> > >
> >
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
> > >
> > > With a Producer I sent 1000 messages to my Kafka broker. I need to know
> > if
> > > it's possible, for example, to read message from offset 500 to 1000.
> > >
> > > What I did :
> > >
> > >
> > >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
> > >
> > >
> > >    -         final ConsumerRecords<Integer, String> records =
> > >    consumer.poll(1000);
> > >
> > >
> > > But this didn't nothing (when I don't use seek() method I consume all
> the
> > > messages without any problems).
> > >
> > > Any help on this will be greatly appreciated !
> > >
> > > Regards,
> > >
> > > Robin
> > >
> >
> >
> >
> > --
> > *Alex Loddengaard | **Solutions Architect | Confluent*
> > *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> > <http://www.confluent.io/download>*
> >
>



-- 
*Alex Loddengaard | **Solutions Architect | Confluent*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Re: Consumer seek on 0.9.0 API

Posted by Péricé Robin <pe...@gmail.com>.
Hi,

Ok I did a poll() before my seek() and poll() again and now my consumer
starts at offset.

Thanks you a lot ! But I don't really understand why I have to do that, if
anyone can explain me.

Regards,

Robin

2016-02-17 20:39 GMT+01:00 Alex Loddengaard <al...@confluent.io>:

> Hi Robin,
>
> I believe seek() needs to be called after the consumer gets its partition
> assignments. Try calling poll() before you call seek(), then poll() again
> and process the records from the latter poll().
>
> There may be a better way to do this -- let's see if anyone else has a
> suggestion.
>
> Alex
>
> On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm trying to use the new Consumer API with this example :
> >
> >
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
> >
> > With a Producer I sent 1000 messages to my Kafka broker. I need to know
> if
> > it's possible, for example, to read message from offset 500 to 1000.
> >
> > What I did :
> >
> >
> >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
> >
> >
> >    -         final ConsumerRecords<Integer, String> records =
> >    consumer.poll(1000);
> >
> >
> > But this didn't nothing (when I don't use seek() method I consume all the
> > messages without any problems).
> >
> > Any help on this will be greatly appreciated !
> >
> > Regards,
> >
> > Robin
> >
>
>
>
> --
> *Alex Loddengaard | **Solutions Architect | Confluent*
> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> <http://www.confluent.io/download>*
>

Re: Consumer seek on 0.9.0 API

Posted by Péricé Robin <pe...@gmail.com>.
OK I understand the explanation. Thanks you for sharing your knowledge !

Regards,

Robin

2016-02-18 18:56 GMT+01:00 Jason Gustafson <ja...@confluent.io>:

> Woops. Looks like Alex got there first. Glad you were able to figure it
> out.
>
> -Jason
>
> On Thu, Feb 18, 2016 at 9:55 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Robin,
> >
> > It would be helpful if you posted the full code you were trying to use.
> > How to seek largely depends on whether you are using new consumer in
> > "simple" or "group" mode. In simple mode, when you know the partitions
> you
> > want to consume, you should just be able to do something like the
> following:
> >
> > consumer.assign(Arrays.asList(partition));
> > consumer.seek(partition, 500);
> >
> > Then you can call poll() in a loop until you hit offset 1000 and stop.
> > Does that make sense?
> >
> > -Jason
> >
> >
> > On Wed, Feb 17, 2016 at 11:39 AM, Alex Loddengaard <al...@confluent.io>
> > wrote:
> >
> >> Hi Robin,
> >>
> >> I believe seek() needs to be called after the consumer gets its
> partition
> >> assignments. Try calling poll() before you call seek(), then poll()
> again
> >> and process the records from the latter poll().
> >>
> >> There may be a better way to do this -- let's see if anyone else has a
> >> suggestion.
> >>
> >> Alex
> >>
> >> On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I'm trying to use the new Consumer API with this example :
> >> >
> >> >
> >>
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
> >> >
> >> > With a Producer I sent 1000 messages to my Kafka broker. I need to
> know
> >> if
> >> > it's possible, for example, to read message from offset 500 to 1000.
> >> >
> >> > What I did :
> >> >
> >> >
> >> >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
> >> >
> >> >
> >> >    -         final ConsumerRecords<Integer, String> records =
> >> >    consumer.poll(1000);
> >> >
> >> >
> >> > But this didn't nothing (when I don't use seek() method I consume all
> >> the
> >> > messages without any problems).
> >> >
> >> > Any help on this will be greatly appreciated !
> >> >
> >> > Regards,
> >> >
> >> > Robin
> >> >
> >>
> >>
> >>
> >> --
> >> *Alex Loddengaard | **Solutions Architect | Confluent*
> >> *Download Apache Kafka and Confluent Platform:
> www.confluent.io/download
> >> <http://www.confluent.io/download>*
> >>
> >
> >
>

Re: Consumer seek on 0.9.0 API

Posted by Jason Gustafson <ja...@confluent.io>.
Woops. Looks like Alex got there first. Glad you were able to figure it out.

-Jason

On Thu, Feb 18, 2016 at 9:55 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hi Robin,
>
> It would be helpful if you posted the full code you were trying to use.
> How to seek largely depends on whether you are using new consumer in
> "simple" or "group" mode. In simple mode, when you know the partitions you
> want to consume, you should just be able to do something like the following:
>
> consumer.assign(Arrays.asList(partition));
> consumer.seek(partition, 500);
>
> Then you can call poll() in a loop until you hit offset 1000 and stop.
> Does that make sense?
>
> -Jason
>
>
> On Wed, Feb 17, 2016 at 11:39 AM, Alex Loddengaard <al...@confluent.io>
> wrote:
>
>> Hi Robin,
>>
>> I believe seek() needs to be called after the consumer gets its partition
>> assignments. Try calling poll() before you call seek(), then poll() again
>> and process the records from the latter poll().
>>
>> There may be a better way to do this -- let's see if anyone else has a
>> suggestion.
>>
>> Alex
>>
>> On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > I'm trying to use the new Consumer API with this example :
>> >
>> >
>> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
>> >
>> > With a Producer I sent 1000 messages to my Kafka broker. I need to know
>> if
>> > it's possible, for example, to read message from offset 500 to 1000.
>> >
>> > What I did :
>> >
>> >
>> >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
>> >
>> >
>> >    -         final ConsumerRecords<Integer, String> records =
>> >    consumer.poll(1000);
>> >
>> >
>> > But this didn't nothing (when I don't use seek() method I consume all
>> the
>> > messages without any problems).
>> >
>> > Any help on this will be greatly appreciated !
>> >
>> > Regards,
>> >
>> > Robin
>> >
>>
>>
>>
>> --
>> *Alex Loddengaard | **Solutions Architect | Confluent*
>> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
>> <http://www.confluent.io/download>*
>>
>
>

Re: Consumer seek on 0.9.0 API

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Robin,

It would be helpful if you posted the full code you were trying to use. How
to seek largely depends on whether you are using new consumer in "simple"
or "group" mode. In simple mode, when you know the partitions you want to
consume, you should just be able to do something like the following:

consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 500);

Then you can call poll() in a loop until you hit offset 1000 and stop. Does
that make sense?

-Jason


On Wed, Feb 17, 2016 at 11:39 AM, Alex Loddengaard <al...@confluent.io>
wrote:

> Hi Robin,
>
> I believe seek() needs to be called after the consumer gets its partition
> assignments. Try calling poll() before you call seek(), then poll() again
> and process the records from the latter poll().
>
> There may be a better way to do this -- let's see if anyone else has a
> suggestion.
>
> Alex
>
> On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm trying to use the new Consumer API with this example :
> >
> >
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
> >
> > With a Producer I sent 1000 messages to my Kafka broker. I need to know
> if
> > it's possible, for example, to read message from offset 500 to 1000.
> >
> > What I did :
> >
> >
> >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
> >
> >
> >    -         final ConsumerRecords<Integer, String> records =
> >    consumer.poll(1000);
> >
> >
> > But this didn't nothing (when I don't use seek() method I consume all the
> > messages without any problems).
> >
> > Any help on this will be greatly appreciated !
> >
> > Regards,
> >
> > Robin
> >
>
>
>
> --
> *Alex Loddengaard | **Solutions Architect | Confluent*
> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> <http://www.confluent.io/download>*
>

Re: Consumer seek on 0.9.0 API

Posted by Alex Loddengaard <al...@confluent.io>.
Hi Robin,

I believe seek() needs to be called after the consumer gets its partition
assignments. Try calling poll() before you call seek(), then poll() again
and process the records from the latter poll().

There may be a better way to do this -- let's see if anyone else has a
suggestion.

Alex

On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <pe...@gmail.com>
wrote:

> Hi,
>
> I'm trying to use the new Consumer API with this example :
>
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
>
> With a Producer I sent 1000 messages to my Kafka broker. I need to know if
> it's possible, for example, to read message from offset 500 to 1000.
>
> What I did :
>
>
>    -         consumer.seek(new TopicPartition("topic1", 0), 500);
>
>
>    -         final ConsumerRecords<Integer, String> records =
>    consumer.poll(1000);
>
>
> But this didn't nothing (when I don't use seek() method I consume all the
> messages without any problems).
>
> Any help on this will be greatly appreciated !
>
> Regards,
>
> Robin
>



-- 
*Alex Loddengaard | **Solutions Architect | Confluent*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*