You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sagar <sa...@gmail.com> on 2018/01/03 18:25:30 UTC

Question Regarding seek() in KafkaConsumer

Hi,

I have a use case where in I need exactly once processing for data across
various topics. So, this means that in case of errors, the next time poll()
method runs, it should start off from the exact same record.

The approach I have used is,

1) Call poll() and get a list of records.

2) For each record:

2.1) Get the processing status of a record.

2.2) In case of success, add it to a list of records to be committed.

2.3) In case of failure:

a) Commit whatever records were collected till now.
b) For each topic partition, get the last committed offset using
committed() method in KafkaConsumer.
c) Seek the topic partition to the offset obtained from above.
break the loop.


In my mind it should have worked, but then I experienced some data loss
along the way when I ran it from earliest(across 17 topics 5 partitions).

The javadocs for seek say :

*If this API is invoked for the same partition more than once, the latest
offset will be used on the next poll(). Note that you may lose data if this
API is arbitrarily used in the middle of consumption, to reset the fetch
offsets*

I am committing offsets manually (sync). Just wanted to understand is i not
safe to call  this api this way?

If I want to reliably retry then what patterns should be used? Couple of
other approaches I read in Kafka definitive gide were to push the retriable
records to a buffer or to a separate topic. So, are those the correct ways
of retrying and not this?

Thanks!
Sagar.