You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Cheolwoo Choi <pr...@gmail.com> on 2013/10/28 01:49:50 UTC

How to commit offset every time, automatically

Hi, all
I'm using kafka-0.7.2

I'd like to commit offset every time reading a message block.
"autocommit.interval.ms" property is not enough.

To do that, should I use SimpleConsumer, and write a commit logic manually?
(i.e. zkUtils.commitOffset(groupId, messageAndOffset.offset() )

Is there no way to commit automatically every time ?

Thanks

Re: How to commit offset every time, automatically

Posted by Cheolwoo Choi <pr...@gmail.com>.
Thank you for your reply.

The 2nd one is what I want.
I has tested with it, and it works fine. Thanks.

I hope it supports 'consumerConnector.gracefulShutdown()' to finalize a
connector gracefully (with some user properties). :-)




On Mon, Oct 28, 2013 at 10:16 AM, Philip O'Toole <ph...@loggly.com> wrote:

> You have two choices.
>
> -- Do what you say, and write your own consumer, based on the
> SimpleConsumer. Handle all commits, ZK accesses, and balancing yourself.
>
> -- Use a ConsumerConnector for every partition, and call commitOffsets()
> explicitly when you have processed a message. This does a commit for every
> partition managed by the ConsumerConnector, but if you have the same number
> of ConsumerConnector as you have partitions, each call to commitOffsets()
> is handling only one partition. This way you get what you need, but also
> get the great rebalancing logic of the high-level consumer.
>
> At Loggly we tend to follow the 2nd pattern, and find it works very well.
>
> Philip
>
>
> On Sun, Oct 27, 2013 at 5:49 PM, Cheolwoo Choi <pr...@gmail.com> wrote:
>
> > Hi, all
> > I'm using kafka-0.7.2
> >
> > I'd like to commit offset every time reading a message block.
> > "autocommit.interval.ms" property is not enough.
> >
> > To do that, should I use SimpleConsumer, and write a commit logic
> manually?
> > (i.e. zkUtils.commitOffset(groupId, messageAndOffset.offset() )
> >
> > Is there no way to commit automatically every time ?
> >
> > Thanks
> >
>

Re: How to commit offset every time, automatically

Posted by Philip O'Toole <ph...@loggly.com>.
You have two choices.

-- Do what you say, and write your own consumer, based on the
SimpleConsumer. Handle all commits, ZK accesses, and balancing yourself.

-- Use a ConsumerConnector for every partition, and call commitOffsets()
explicitly when you have processed a message. This does a commit for every
partition managed by the ConsumerConnector, but if you have the same number
of ConsumerConnector as you have partitions, each call to commitOffsets()
is handling only one partition. This way you get what you need, but also
get the great rebalancing logic of the high-level consumer.

At Loggly we tend to follow the 2nd pattern, and find it works very well.

Philip


On Sun, Oct 27, 2013 at 5:49 PM, Cheolwoo Choi <pr...@gmail.com> wrote:

> Hi, all
> I'm using kafka-0.7.2
>
> I'd like to commit offset every time reading a message block.
> "autocommit.interval.ms" property is not enough.
>
> To do that, should I use SimpleConsumer, and write a commit logic manually?
> (i.e. zkUtils.commitOffset(groupId, messageAndOffset.offset() )
>
> Is there no way to commit automatically every time ?
>
> Thanks
>