You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chia-Chun Shih <ch...@gmail.com> on 2014/11/05 07:42:29 UTC

consumer ack for high-level consumer?

Hi,

I am a new to Kafka. In my understanding, high-level consumer (
ZookeeperConsumerConnector) changes offset when message is drawn
by ConsumerIterator. But I would like to change offset when message is
processed, not when message is drawn from broker. So if a consumer dies
before a message is completely processed, the message will be processed
again. Is it possible?

Thanks.

Re: consumer ack for high-level consumer?

Posted by Chia-Chun Shih <ch...@gmail.com>.
Got it! Thanks for your response.



2014-11-07 13:14 GMT+08:00 Guozhang Wang <wa...@gmail.com>:

> 0. Yes, if consumer crashed before commit its offset it can cause
> duplicates.
>
> 1. Yes, since from the consumer client's point of view, once the message is
> returned from the iterator it is considered as "consumed"; if you want
> consumer to only consider a message as consumed when it is processed by the
> application on top of it, you need to turn off auto offset and manually
> call commit.
>
> On Thu, Nov 6, 2014 at 6:25 PM, Chia-Chun Shih <ch...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
> > is possible to deliver duplicated messages when clients restart.
> >
> > I also wonder the possibilities of losing message. Is it possible that
> > things occur in this order?
> >
> >    1. Client calls ConsumerIterator$next() to get a message, update local
> >    offsets
> >    2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
> >    sync to ZK
> >    3. Client fails when processing this message
> >    4. Client restarts, but this message is marked as consumed in ZK
> >
> > Thanks,
> > Chia-Chun
> >
> > 2014-11-07 1:45 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
> >
> > > That is correct.
> > >
> > > Guozhang
> > >
> > > On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <
> chiachun.shih@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your response. I just read source code and found that:
> > > >
> > > >   1) ConsumerIterator$next() use
> PartitionTopicInfo$resetConsumeOffset
> > to
> > > > update offsets in PartitionTopicInfo objects.
> > > >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets
> from
> > > > PartitionTopicInfo objects, and update offsets to ZK.
> > > >
> > > > So, when clients iterate through messages, offsets are updated
> locally
> > > > in PartitionTopicInfo
> > > > objects. When ZookeeperConsumerConnector$commitOffset is called,
> local
> > > > offsets are sync to ZK. Is it correct?
> > > >
> > > > regards,
> > > > Chia-Chun
> > > >
> > > > 2014-11-06 0:24 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
> > > >
> > > > > Hello,
> > > > >
> > > > > You can turn of auto.commit.offset and manually call
> > > > > connector.commitOffset() manually after you have processed the
> data.
> > > One
> > > > > thing to remember is that the commit frequency is related to ZK (in
> > the
> > > > > future, Kafka) writes and hence you may not want to commit after
> > > > processed
> > > > > every single message but only a batch of messages.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> > > chiachun.shih@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > > > by ConsumerIterator. But I would like to change offset when
> message
> > > is
> > > > > > processed, not when message is drawn from broker. So if a
> consumer
> > > dies
> > > > > > before a message is completely processed, the message will be
> > > processed
> > > > > > again. Is it possible?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: consumer ack for high-level consumer?

Posted by Guozhang Wang <wa...@gmail.com>.
0. Yes, if consumer crashed before commit its offset it can cause
duplicates.

1. Yes, since from the consumer client's point of view, once the message is
returned from the iterator it is considered as "consumed"; if you want
consumer to only consider a message as consumed when it is processed by the
application on top of it, you need to turn off auto offset and manually
call commit.

On Thu, Nov 6, 2014 at 6:25 PM, Chia-Chun Shih <ch...@gmail.com>
wrote:

> Hi,
>
> Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
> is possible to deliver duplicated messages when clients restart.
>
> I also wonder the possibilities of losing message. Is it possible that
> things occur in this order?
>
>    1. Client calls ConsumerIterator$next() to get a message, update local
>    offsets
>    2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
>    sync to ZK
>    3. Client fails when processing this message
>    4. Client restarts, but this message is marked as consumed in ZK
>
> Thanks,
> Chia-Chun
>
> 2014-11-07 1:45 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
>
> > That is correct.
> >
> > Guozhang
> >
> > On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <ch...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for your response. I just read source code and found that:
> > >
> > >   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset
> to
> > > update offsets in PartitionTopicInfo objects.
> > >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> > > PartitionTopicInfo objects, and update offsets to ZK.
> > >
> > > So, when clients iterate through messages, offsets are updated locally
> > > in PartitionTopicInfo
> > > objects. When ZookeeperConsumerConnector$commitOffset is called, local
> > > offsets are sync to ZK. Is it correct?
> > >
> > > regards,
> > > Chia-Chun
> > >
> > > 2014-11-06 0:24 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
> > >
> > > > Hello,
> > > >
> > > > You can turn of auto.commit.offset and manually call
> > > > connector.commitOffset() manually after you have processed the data.
> > One
> > > > thing to remember is that the commit frequency is related to ZK (in
> the
> > > > future, Kafka) writes and hence you may not want to commit after
> > > processed
> > > > every single message but only a batch of messages.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> > chiachun.shih@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > > by ConsumerIterator. But I would like to change offset when message
> > is
> > > > > processed, not when message is drawn from broker. So if a consumer
> > dies
> > > > > before a message is completely processed, the message will be
> > processed
> > > > > again. Is it possible?
> > > > >
> > > > > Thanks.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: consumer ack for high-level consumer?

Posted by Chia-Chun Shih <ch...@gmail.com>.
Hi,

Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
is possible to deliver duplicated messages when clients restart.

I also wonder the possibilities of losing message. Is it possible that
things occur in this order?

   1. Client calls ConsumerIterator$next() to get a message, update local
   offsets
   2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
   sync to ZK
   3. Client fails when processing this message
   4. Client restarts, but this message is marked as consumed in ZK

Thanks,
Chia-Chun

2014-11-07 1:45 GMT+08:00 Guozhang Wang <wa...@gmail.com>:

> That is correct.
>
> Guozhang
>
> On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <ch...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for your response. I just read source code and found that:
> >
> >   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
> > update offsets in PartitionTopicInfo objects.
> >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> > PartitionTopicInfo objects, and update offsets to ZK.
> >
> > So, when clients iterate through messages, offsets are updated locally
> > in PartitionTopicInfo
> > objects. When ZookeeperConsumerConnector$commitOffset is called, local
> > offsets are sync to ZK. Is it correct?
> >
> > regards,
> > Chia-Chun
> >
> > 2014-11-06 0:24 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
> >
> > > Hello,
> > >
> > > You can turn of auto.commit.offset and manually call
> > > connector.commitOffset() manually after you have processed the data.
> One
> > > thing to remember is that the commit frequency is related to ZK (in the
> > > future, Kafka) writes and hence you may not want to commit after
> > processed
> > > every single message but only a batch of messages.
> > >
> > > Guozhang
> > >
> > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> chiachun.shih@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > by ConsumerIterator. But I would like to change offset when message
> is
> > > > processed, not when message is drawn from broker. So if a consumer
> dies
> > > > before a message is completely processed, the message will be
> processed
> > > > again. Is it possible?
> > > >
> > > > Thanks.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: consumer ack for high-level consumer?

Posted by Guozhang Wang <wa...@gmail.com>.
That is correct.

Guozhang

On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <ch...@gmail.com>
wrote:

> Hi,
>
> Thanks for your response. I just read source code and found that:
>
>   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
> update offsets in PartitionTopicInfo objects.
>   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> PartitionTopicInfo objects, and update offsets to ZK.
>
> So, when clients iterate through messages, offsets are updated locally
> in PartitionTopicInfo
> objects. When ZookeeperConsumerConnector$commitOffset is called, local
> offsets are sync to ZK. Is it correct?
>
> regards,
> Chia-Chun
>
> 2014-11-06 0:24 GMT+08:00 Guozhang Wang <wa...@gmail.com>:
>
> > Hello,
> >
> > You can turn of auto.commit.offset and manually call
> > connector.commitOffset() manually after you have processed the data. One
> > thing to remember is that the commit frequency is related to ZK (in the
> > future, Kafka) writes and hence you may not want to commit after
> processed
> > every single message but only a batch of messages.
> >
> > Guozhang
> >
> > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <chiachun.shih@gmail.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > I am a new to Kafka. In my understanding, high-level consumer (
> > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > by ConsumerIterator. But I would like to change offset when message is
> > > processed, not when message is drawn from broker. So if a consumer dies
> > > before a message is completely processed, the message will be processed
> > > again. Is it possible?
> > >
> > > Thanks.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: consumer ack for high-level consumer?

Posted by Chia-Chun Shih <ch...@gmail.com>.
Hi,

Thanks for your response. I just read source code and found that:

  1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
update offsets in PartitionTopicInfo objects.
  2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
PartitionTopicInfo objects, and update offsets to ZK.

So, when clients iterate through messages, offsets are updated locally
in PartitionTopicInfo
objects. When ZookeeperConsumerConnector$commitOffset is called, local
offsets are sync to ZK. Is it correct?

regards,
Chia-Chun

2014-11-06 0:24 GMT+08:00 Guozhang Wang <wa...@gmail.com>:

> Hello,
>
> You can turn of auto.commit.offset and manually call
> connector.commitOffset() manually after you have processed the data. One
> thing to remember is that the commit frequency is related to ZK (in the
> future, Kafka) writes and hence you may not want to commit after processed
> every single message but only a batch of messages.
>
> Guozhang
>
> On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <ch...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am a new to Kafka. In my understanding, high-level consumer (
> > ZookeeperConsumerConnector) changes offset when message is drawn
> > by ConsumerIterator. But I would like to change offset when message is
> > processed, not when message is drawn from broker. So if a consumer dies
> > before a message is completely processed, the message will be processed
> > again. Is it possible?
> >
> > Thanks.
> >
>
>
>
> --
> -- Guozhang
>

Re: consumer ack for high-level consumer?

Posted by Guozhang Wang <wa...@gmail.com>.
Hello,

You can turn of auto.commit.offset and manually call
connector.commitOffset() manually after you have processed the data. One
thing to remember is that the commit frequency is related to ZK (in the
future, Kafka) writes and hence you may not want to commit after processed
every single message but only a batch of messages.

Guozhang

On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <ch...@gmail.com>
wrote:

> Hi,
>
> I am a new to Kafka. In my understanding, high-level consumer (
> ZookeeperConsumerConnector) changes offset when message is drawn
> by ConsumerIterator. But I would like to change offset when message is
> processed, not when message is drawn from broker. So if a consumer dies
> before a message is completely processed, the message will be processed
> again. Is it possible?
>
> Thanks.
>



-- 
-- Guozhang