You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Yu, Libo " <li...@citi.com> on 2013/05/08 18:17:07 UTC

a few questions from high level consumer documentation.

Hi,

I read this link https://cwiki.apache.org/KAFKA/consumer-group-example.html
and have a few questions (if not too many).

1 When you say the iterator may block, do you mean hasNext() may block?

2 "Remember, you can only use a single process per Consumer Group."
    Do you mean we can only use a single process on one node of the cluster for a consumer group?
    Or there can be only one process on the whole cluster for a consumer group? Please clarify on this.

3 Why save offset to zookeeper? Is it easier to save it to a local file?

4 When client exits/crashes or leader for a partition is changed, duplicate messages may be replayed. "To help avoid this (replayed duplicate messages), make sure you provide a clean way for your client to exit instead of assuming it can be 'kill -9'd."

a.       For client exit, if the client is receiving data at the time, how to do a clean exit? How can client tell consumer to write offset to zookeepr before exiting?


b.      For client crash, what can client do to avoid duplicate messages when restarted? What I can think of is to read last message from log file and ignore the first few received duplicate messages until receiving the last read message. But is it possible for client to read log file directly?


c.       For the change of the partition leader, is there anything that clients can do to avoid duplicates?

Thanks.



Libo


Re: a few questions from high level consumer documentation.

Posted by Neha Narkhede <ne...@gmail.com>.
Thanks,
Neha
On May 9, 2013 5:28 AM, "Chris Curtin" <cu...@gmail.com> wrote:
>
> On Thu, May 9, 2013 at 12:36 AM, Rob Withers <re...@gmail.com> wrote:
>
> >
> >
> > > -----Original Message-----
> > > From: Chris Curtin [mailto:curtin.chris@gmail.com]
> >
> > > > 1 When you say the iterator may block, do you mean hasNext() may
block?
> > > >
> > >
> > > Yes.
> >
> > Is this due to a potential non-blocking fetch (broker/zookeeper returns
an
> > empty block if offset is current)?  Yet this blocks the network call of
the
> > consumer iterator, do I have that right?  Are there other reasons it
could
> > block?  Like the call fails and a backup call is made?
> >
>
> I'll let the Kafka team answer this. I don't know the low level details.
>
It is because the consumer could be at the tail end and new data could
arrive at the server at a later time. The consumer is blocking by default
to handle a continuous stream of data.
> >
> > > > b.      For client crash, what can client do to avoid duplicate
> > messages
> > > > when restarted? What I can think of is to read last message from log
> > > > file and ignore the first few received duplicate messages until
> > > > receiving the last read message. But is it possible for client to
read
> > log file
> > > directly?
> > > >
> > >
> > > If you can't tolerate the possibility of duplicates you need to look
at
> > the
> > > Simple Consumer example, There you control the offset storage.
> >
> > Do you have example code that manages only once, even when a consumer
for a
> > given partition goes away?
> >
>
> No, but if you look at the Simple Consumer example where the read occurs
> (and the write to System.out) at that point you know the offset you just
> read, so you need to put it somewhere. Using the Simple Consumer Kafka
> leaves all the offset management to you.
>
>
> >
> > What does happen with rebalancing when a consumer goes away?
>
>
> Hmm, I can't find the link to the algorithm right now. Jun or Neha can
you?

You can find the algorithm on the design page.
http://kafka.apache.org/07/design.html

>
> > Is this
> > behavior of the high-level consumer group?
>
>
> Yes.
>
>
> > Is there a way to supply one's
> > own simple consumer with only once, within a consumer group that
> > rebalances?
> >
> No. Simple Consumers don't have rebalancing steps. Basically you take
> control of what is requested from which topics and partitions. So you
could
> ask for a specific offset in a topic/partition 100 times in a row and
Kafka
> will happily return it to you. Nothing is written to ZooKeeper either, you
> control everything.
>
>
>
> >
> > What happens if a producer goes away?
> >
>
> Shouldn't matter to the consumers. The Brokers are what the consumers talk
> to, so if nothing is writing the Broker won't have anything to send.
>
> >
> > thanks much,
> > rob
> >
> >
> >

Re: a few questions from high level consumer documentation.

Posted by David Arthur <mu...@gmail.com>.
On 5/9/13 8:27 AM, Chris Curtin wrote:
> On Thu, May 9, 2013 at 12:36 AM, Rob Withers <re...@gmail.com> wrote:
>
>>
>>> -----Original Message-----
>>> From: Chris Curtin [mailto:curtin.chris@gmail.com]
>>>> 1 When you say the iterator may block, do you mean hasNext() may block?
>>>>
>>> Yes.
>> Is this due to a potential non-blocking fetch (broker/zookeeper returns an
>> empty block if offset is current)?  Yet this blocks the network call of the
>> consumer iterator, do I have that right?  Are there other reasons it could
>> block?  Like the call fails and a backup call is made?
>>
> I'll let the Kafka team answer this. I don't know the low level details.
The iterator will block if there is no more data to consume. The 
iterator is actually reading messages from a BlockingQueue which is fed 
messages by the fetcher threads. The reason for this is to allow you to 
configure blocking with or without a timeout in the ConsumerIterator. 
This is reflected in the consumer timeout property (consumer.timeout.ms)
>
>
>>>> b.      For client crash, what can client do to avoid duplicate
>> messages
>>>> when restarted? What I can think of is to read last message from log
>>>> file and ignore the first few received duplicate messages until
>>>> receiving the last read message. But is it possible for client to read
>> log file
>>> directly?
>>> If you can't tolerate the possibility of duplicates you need to look at
>> the
>>> Simple Consumer example, There you control the offset storage.
>> Do you have example code that manages only once, even when a consumer for a
>> given partition goes away?
>>
> No, but if you look at the Simple Consumer example where the read occurs
> (and the write to System.out) at that point you know the offset you just
> read, so you need to put it somewhere. Using the Simple Consumer Kafka
> leaves all the offset management to you.
>
>
>> What does happen with rebalancing when a consumer goes away?
>
> Hmm, I can't find the link to the algorithm right now. Jun or Neha can you?
Down at the bottom of the 0.7 design page 
http://kafka.apache.org/07/design.html
>
>
>> Is this
>> behavior of the high-level consumer group?
>
> Yes.
>
>
>> Is there a way to supply one's
>> own simple consumer with only once, within a consumer group that
>> rebalances?
>>
> No. Simple Consumers don't have rebalancing steps. Basically you take
> control of what is requested from which topics and partitions. So you could
> ask for a specific offset in a topic/partition 100 times in a row and Kafka
> will happily return it to you. Nothing is written to ZooKeeper either, you
> control everything.
>
>
>
>> What happens if a producer goes away?
>>
> Shouldn't matter to the consumers. The Brokers are what the consumers talk
> to, so if nothing is writing the Broker won't have anything to send.
>
>> thanks much,
>> rob
>>
>>
>>


Re: a few questions from high level consumer documentation.

Posted by Chris Curtin <cu...@gmail.com>.
On Thu, May 9, 2013 at 12:36 AM, Rob Withers <re...@gmail.com> wrote:

>
>
> > -----Original Message-----
> > From: Chris Curtin [mailto:curtin.chris@gmail.com]
>
> > > 1 When you say the iterator may block, do you mean hasNext() may block?
> > >
> >
> > Yes.
>
> Is this due to a potential non-blocking fetch (broker/zookeeper returns an
> empty block if offset is current)?  Yet this blocks the network call of the
> consumer iterator, do I have that right?  Are there other reasons it could
> block?  Like the call fails and a backup call is made?
>

I'll let the Kafka team answer this. I don't know the low level details.


>
> > > b.      For client crash, what can client do to avoid duplicate
> messages
> > > when restarted? What I can think of is to read last message from log
> > > file and ignore the first few received duplicate messages until
> > > receiving the last read message. But is it possible for client to read
> log file
> > directly?
> > >
> >
> > If you can't tolerate the possibility of duplicates you need to look at
> the
> > Simple Consumer example, There you control the offset storage.
>
> Do you have example code that manages only once, even when a consumer for a
> given partition goes away?
>

No, but if you look at the Simple Consumer example where the read occurs
(and the write to System.out) at that point you know the offset you just
read, so you need to put it somewhere. Using the Simple Consumer Kafka
leaves all the offset management to you.


>
> What does happen with rebalancing when a consumer goes away?


Hmm, I can't find the link to the algorithm right now. Jun or Neha can you?


> Is this
> behavior of the high-level consumer group?


Yes.


> Is there a way to supply one's
> own simple consumer with only once, within a consumer group that
> rebalances?
>
No. Simple Consumers don't have rebalancing steps. Basically you take
control of what is requested from which topics and partitions. So you could
ask for a specific offset in a topic/partition 100 times in a row and Kafka
will happily return it to you. Nothing is written to ZooKeeper either, you
control everything.



>
> What happens if a producer goes away?
>

Shouldn't matter to the consumers. The Brokers are what the consumers talk
to, so if nothing is writing the Broker won't have anything to send.

>
> thanks much,
> rob
>
>
>

RE: a few questions from high level consumer documentation.

Posted by Rob Withers <re...@gmail.com>.

> -----Original Message-----
> From: Chris Curtin [mailto:curtin.chris@gmail.com]

> > 1 When you say the iterator may block, do you mean hasNext() may block?
> >
> 
> Yes.

Is this due to a potential non-blocking fetch (broker/zookeeper returns an
empty block if offset is current)?  Yet this blocks the network call of the
consumer iterator, do I have that right?  Are there other reasons it could
block?  Like the call fails and a backup call is made?

> > b.      For client crash, what can client do to avoid duplicate messages
> > when restarted? What I can think of is to read last message from log
> > file and ignore the first few received duplicate messages until
> > receiving the last read message. But is it possible for client to read
log file
> directly?
> >
> 
> If you can't tolerate the possibility of duplicates you need to look at
the
> Simple Consumer example, There you control the offset storage.

Do you have example code that manages only once, even when a consumer for a
given partition goes away?

What does happen with rebalancing when a consumer goes away?  Is this
behavior of the high-level consumer group?  Is there a way to supply one's
own simple consumer with only once, within a consumer group that rebalances?

What happens if a producer goes away?

thanks much,
rob



Re: a few questions from high level consumer documentation.

Posted by Chris Curtin <cu...@gmail.com>.
I'll try to answer some, the Kafka team will need to answer the others:


On Wed, May 8, 2013 at 12:17 PM, Yu, Libo <li...@citi.com> wrote:

> Hi,
>
> I read this link
> https://cwiki.apache.org/KAFKA/consumer-group-example.html
> and have a few questions (if not too many).
>
> 1 When you say the iterator may block, do you mean hasNext() may block?
>

Yes.


>
> 2 "Remember, you can only use a single process per Consumer Group."
>     Do you mean we can only use a single process on one node of the
> cluster for a consumer group?
>     Or there can be only one process on the whole cluster for a consumer
> group? Please clarify on this.
>
> Bug. I'll change it. When I wrote this I mis-understood the re-balancing
step. I missed this reference but fixed the others. Sorry



> 3 Why save offset to zookeeper? Is it easier to save it to a local file?
>
> 4 When client exits/crashes or leader for a partition is changed,
> duplicate messages may be replayed. "To help avoid this (replayed duplicate
> messages), make sure you provide a clean way for your client to exit
> instead of assuming it can be 'kill -9'd."
>
> a.       For client exit, if the client is receiving data at the time, how
> to do a clean exit? How can client tell consumer to write offset to
> zookeepr before exiting?
>

If you call the shutdown() method on the Consumer it will cleanly stop,
releasing any blocked iterators. In the example it goes to sleep for a few
seconds then cleanly shuts down.


>
>
> b.      For client crash, what can client do to avoid duplicate messages
> when restarted? What I can think of is to read last message from log file
> and ignore the first few received duplicate messages until receiving the
> last read message. But is it possible for client to read log file directly?
>

If you can't tolerate the possibility of duplicates you need to look at the
Simple Consumer example, There you control the offset storage.


>
>
> c.       For the change of the partition leader, is there anything that
> clients can do to avoid duplicates?
>
> Thanks.
>
>
>
> Libo
>
>

Re: a few questions from high level consumer documentation.

Posted by Jun Rao <ju...@gmail.com>.
For #3, we need to checkpoint offsets to a central place so that if a
consumer fails, another consumer in the same group can pick up from where
it's left off.

For #4c, leader change doesn't introduce duplicates.

Thanks,

Jun


On Wed, May 8, 2013 at 9:17 AM, Yu, Libo <li...@citi.com> wrote:

> Hi,
>
> I read this link
> https://cwiki.apache.org/KAFKA/consumer-group-example.html
> and have a few questions (if not too many).
>
> 1 When you say the iterator may block, do you mean hasNext() may block?
>
> 2 "Remember, you can only use a single process per Consumer Group."
>     Do you mean we can only use a single process on one node of the
> cluster for a consumer group?
>     Or there can be only one process on the whole cluster for a consumer
> group? Please clarify on this.
>
> 3 Why save offset to zookeeper? Is it easier to save it to a local file?
>
> 4 When client exits/crashes or leader for a partition is changed,
> duplicate messages may be replayed. "To help avoid this (replayed duplicate
> messages), make sure you provide a clean way for your client to exit
> instead of assuming it can be 'kill -9'd."
>
> a.       For client exit, if the client is receiving data at the time, how
> to do a clean exit? How can client tell consumer to write offset to
> zookeepr before exiting?
>
>
> b.      For client crash, what can client do to avoid duplicate messages
> when restarted? What I can think of is to read last message from log file
> and ignore the first few received duplicate messages until receiving the
> last read message. But is it possible for client to read log file directly?
>
>
> c.       For the change of the partition leader, is there anything that
> clients can do to avoid duplicates?
>
> Thanks.
>
>
>
> Libo
>
>