You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Amir Zuker <a....@gmail.com> on 2016/08/16 08:50:35 UTC

Scala: Kafka Consumer (kafka-clients 0.9.0.1)

Hi everyone,

I have a question regarding the 'KafkaConsumer' and its API in regards to
committing offsets. (kafka-clients 0.9.0.1)

*The scenario -*
I am working with auto commit set to disabled because I want to implement a
retry mechanism and eventually transfer the message to another topic that
contains the poison messages.
Since I want it to be reliable, I am not using the auto commit and I wish
to take control on when that should happen

*The implementation detail -*
My class that extends 'Runnable' and is created by the KafkaConsumer needs
to commit the offset once it is done with handling the topic message.
However, the API for committing messages is located on the KafkaConsumer
with no relation to partition or thread.

*The problem -*
If I understand correctly, I can use the same KafkaConsumer instance with
multiple threads against multiple partitions.
If that is the case, how can I commit the offset specific to my 'Runnable'
instance that just processed a single message without affecting other
threads and partitions?

Thanks in advance,
Amir Zuker

Re: Scala: Kafka Consumer (kafka-clients 0.9.0.1)

Posted by Ajay Sharma <as...@arenasolutions.com>.
BTW we used 0.8.x

On 8/16/16, 9:51 AM, "Ajay Sharma" <as...@arenasolutions.com> wrote:

>Amir,
>We had similar requirement to consume every message reliably; the approach
>I picked was to push any message with unsuccessful consumption to a
>secondary topic for later entertainment; in our case the message/events
>were non-dependant so we use to make second attempt for consumption and on
>any failure push it to poison queue (Elastics Search in our case).
>
>Regards, ajay
>
>On 8/16/16, 1:50 AM, "Amir Zuker" <a....@gmail.com> wrote:
>
>>Hi everyone,
>>
>>I have a question regarding the 'KafkaConsumer' and its API in regards to
>>committing offsets. (kafka-clients 0.9.0.1)
>>
>>*The scenario -*
>>I am working with auto commit set to disabled because I want to implement
>>a
>>retry mechanism and eventually transfer the message to another topic that
>>contains the poison messages.
>>Since I want it to be reliable, I am not using the auto commit and I wish
>>to take control on when that should happen
>>
>>*The implementation detail -*
>>My class that extends 'Runnable' and is created by the KafkaConsumer
>>needs
>>to commit the offset once it is done with handling the topic message.
>>However, the API for committing messages is located on the KafkaConsumer
>>with no relation to partition or thread.
>>
>>*The problem -*
>>If I understand correctly, I can use the same KafkaConsumer instance with
>>multiple threads against multiple partitions.
>>If that is the case, how can I commit the offset specific to my
>>'Runnable'
>>instance that just processed a single message without affecting other
>>threads and partitions?
>>
>>Thanks in advance,
>>Amir Zuker
>


Re: Scala: Kafka Consumer (kafka-clients 0.9.0.1)

Posted by Ajay Sharma <as...@arenasolutions.com>.
Amir,
We had similar requirement to consume every message reliably; the approach
I picked was to push any message with unsuccessful consumption to a
secondary topic for later entertainment; in our case the message/events
were non-dependant so we use to make second attempt for consumption and on
any failure push it to poison queue (Elastics Search in our case).

Regards, ajay

On 8/16/16, 1:50 AM, "Amir Zuker" <a....@gmail.com> wrote:

>Hi everyone,
>
>I have a question regarding the 'KafkaConsumer' and its API in regards to
>committing offsets. (kafka-clients 0.9.0.1)
>
>*The scenario -*
>I am working with auto commit set to disabled because I want to implement
>a
>retry mechanism and eventually transfer the message to another topic that
>contains the poison messages.
>Since I want it to be reliable, I am not using the auto commit and I wish
>to take control on when that should happen
>
>*The implementation detail -*
>My class that extends 'Runnable' and is created by the KafkaConsumer needs
>to commit the offset once it is done with handling the topic message.
>However, the API for committing messages is located on the KafkaConsumer
>with no relation to partition or thread.
>
>*The problem -*
>If I understand correctly, I can use the same KafkaConsumer instance with
>multiple threads against multiple partitions.
>If that is the case, how can I commit the offset specific to my 'Runnable'
>instance that just processed a single message without affecting other
>threads and partitions?
>
>Thanks in advance,
>Amir Zuker


Re: Scala: Kafka Consumer (kafka-clients 0.9.0.1)

Posted by Amir Zuker <a....@gmail.com>.
This is not the case in my situation.
I am using an older version where it doesn't have the subscriber model, and
processing messages can occur concurrently in multiple threads.

If I use the KafkaConsumer with multiple threads and partitions -
kafkaConsumer.run(5) //5 thread count -
How can I commit an offset for a specific thread and partition in
kafka-clients 0.9.0.1?

Thanks



Regards,
Amir Zuker

On Tue, Aug 16, 2016 at 1:36 PM, Sudev A C <su...@goibibo.com> wrote:

> Hi,
>
> Message object consists of partition, topic, offset and message.
> https://kafka.apache.org/090/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
> You many use this to get current offsets for topic-partition combination.
>
> Thanks
> Sudev
>
> On Tue, Aug 16, 2016 at 2:20 PM, Amir Zuker <a....@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I have a question regarding the 'KafkaConsumer' and its API in regards to
> > committing offsets. (kafka-clients 0.9.0.1)
> >
> > *The scenario -*
> > I am working with auto commit set to disabled because I want to
> implement a
> > retry mechanism and eventually transfer the message to another topic that
> > contains the poison messages.
> > Since I want it to be reliable, I am not using the auto commit and I wish
> > to take control on when that should happen
> >
> > *The implementation detail -*
> > My class that extends 'Runnable' and is created by the KafkaConsumer
> needs
> > to commit the offset once it is done with handling the topic message.
> > However, the API for committing messages is located on the KafkaConsumer
> > with no relation to partition or thread.
> >
> > *The problem -*
> > If I understand correctly, I can use the same KafkaConsumer instance with
> > multiple threads against multiple partitions.
> > If that is the case, how can I commit the offset specific to my
> 'Runnable'
> > instance that just processed a single message without affecting other
> > threads and partitions?
> >
> > Thanks in advance,
> > Amir Zuker
> >
>
>
>
> --
> Thanks
> Sudev A C
> Data Team
>

Re: Scala: Kafka Consumer (kafka-clients 0.9.0.1)

Posted by Sudev A C <su...@goibibo.com>.
Hi,

Message object consists of partition, topic, offset and message.
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
You many use this to get current offsets for topic-partition combination.

Thanks
Sudev

On Tue, Aug 16, 2016 at 2:20 PM, Amir Zuker <a....@gmail.com> wrote:

> Hi everyone,
>
> I have a question regarding the 'KafkaConsumer' and its API in regards to
> committing offsets. (kafka-clients 0.9.0.1)
>
> *The scenario -*
> I am working with auto commit set to disabled because I want to implement a
> retry mechanism and eventually transfer the message to another topic that
> contains the poison messages.
> Since I want it to be reliable, I am not using the auto commit and I wish
> to take control on when that should happen
>
> *The implementation detail -*
> My class that extends 'Runnable' and is created by the KafkaConsumer needs
> to commit the offset once it is done with handling the topic message.
> However, the API for committing messages is located on the KafkaConsumer
> with no relation to partition or thread.
>
> *The problem -*
> If I understand correctly, I can use the same KafkaConsumer instance with
> multiple threads against multiple partitions.
> If that is the case, how can I commit the offset specific to my 'Runnable'
> instance that just processed a single message without affecting other
> threads and partitions?
>
> Thanks in advance,
> Amir Zuker
>



-- 
Thanks
Sudev A C
Data Team