You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/08/08 01:41:08 UTC

error recovery in multiple thread reading from Kafka with HighLevel api

Folks,
 I have a process started at specific time and read from a specific topic.
I am currently using the High Level API(consumer group) to read from
kafka(and will stop once there is nothing in the topic by specifying a
timeout). i am most concerned about error recovery in multiple thread
context. If one thread dies, will other running bolt threads picks up the
failed message? Or I have to start another thread in order to pick up the
failed message? What would be  a good practice to ensure the message can be
processed at least once?

Note that all threads are using the same group id.

Thanks,
Chen

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
We do not have java implementation of the operational tools yet.

Guozhang


On Fri, Aug 8, 2014 at 4:29 PM, Chen Wang <ch...@gmail.com>
wrote:

> Guozhang,
> Just curious, do you guys already have a java version of the
> ConsumerOffsetChecker
>
> https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
> so that I could use it in my storm topology?
> Chen
>
>
> On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
> > config. After fixing it it now behaves as expected.:-)
> > Thanks again!!
> > Chen
> >
> >
> >
> > On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Chen,
> >>
> >> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
> >> Could
> >> you try with larger numbers, like 10000?
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <ch...@gmail.com>
> >> wrote:
> >>
> >> > Guozhang,
> >> > I just did a simple test, and kafka does not seem to do what it is
> >> supposed
> >> > to do:
> >> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions,
> >> and
> >> > throw Runtime exception on all the even numbered messages. (2, 4,
> 6,..)
> >> >
> >> >   while (it.hasNext()){
> >> >
> >> >        String message =  new String(it.next().message());
> >> >
> >> >        System.out.println("message received" + message);
> >> >
> >> >        int messageInt = Integer.parseInt(message);
> >> >
> >> >        if(messageInt % 2 == 0){
> >> >
> >> >         // crash all the even numbered message
> >> >
> >> >         throw new RuntimeException("mesasge " + message + " failed");
> >> >
> >> >        }
> >> >
> >> >        }}
> >> >
> >> > My config is like this;
> >> >
> >> >     props.put("zookeeper.connect", a_zookeeper);
> >> >
> >> >         props.put("group.id", a_groupId);
> >> >
> >> >         props.put("zookeeper.session.timeout.ms", "4000");
> >> >
> >> >         props.put("zookeeper.sync.time.ms", "200");
> >> >
> >> >         props.put("auto.commit.interval.ms", "1000");
> >> >
> >> >         props.put("consumer.timeout.ms","6000");
> >> >
> >> >         props.put("autocommit.interval.ms", "360000");
> >> >
> >> >         props.put("auto.offset.reset","smallest");
> >> >
> >> >
> >> > I started 10 threads, but it seems that whenever I get the even
> numbered
> >> > message, the thread crashes, then I restart them, it starts read from
> >> the
> >> > next message: so in the first batch:
> >> >
> >> > message received1
> >> >
> >> > message received2
> >> >
> >> > Then I start again:
> >> >
> >> > message received3
> >> >
> >> > message received4
> >> >
> >> >
> >> > As you can see, message 2 is not replayed. Is this expected? I
> >> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its
> >> consistent
> >> > with the testing result.(even numbered failed messages are not re
> >> > retrieved)
> >> >
> >> > What i am missing here?
> >> >
> >> > Chen
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Chen,
> >> > >
> >> > > You can use the ConsumerOffsetChecker tool.
> >> > >
> >> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <
> >> chen.apache.solr@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > sounds like a good idea! I think i will go with the high level
> >> consumer
> >> > > > then.
> >> > > > Another question along with this design is that is there a way to
> >> check
> >> > > the
> >> > > > lag for a consumer group for a topic? Upon machine crashes and
> >> > restarts,
> >> > > I
> >> > > > want to only continue reading from a certain topic if the lag is
> NOT
> >> > 0. I
> >> > > > know I could depend on the time out("consumer.timeout.ms") to
> check
> >> > > > whether
> >> > > > there is still data in the topic, but wondering whether there is
> >> more
> >> > > > elegant way.
> >> > > > Thanks much for the help, Guozhang!
> >> > > >
> >> > > > Chen
> >> > > >
> >> > > >
> >> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <
> wangguoz@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Using simple consumer you then need to take care of consumer
> >> failure
> >> > > > > detection and partition reassignment yourself. But you would
> have
> >> > more
> >> > > > > flexibility of the offsets.
> >> > > > >
> >> > > > > If each time processing incur errors the corresponding consumer
> >> > thread
> >> > > > will
> >> > > > > fail also (i.e. will not be involved in the rebalance and hence
> >> > commit
> >> > > > > offsets) and you could live with data duplicates, then you can
> >> just
> >> > > > enable
> >> > > > > auto offset commits with say, 10 secs period. We usually have
> even
> >> > > larger
> >> > > > > period, like minutes.
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> >> > chen.apache.solr@gmail.com
> >> > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Maybe i could batch the messages before commit.., e.g
> committing
> >> > > every
> >> > > > 10
> >> > > > > > second.this is what the auto commit does anyway and  I could
> >> live
> >> > > with
> >> > > > > > duplicate data.
> >> > > > > > What do u think?
> >> > > > > >
> >> > > > > > I would then also seem to need a monitoring daemon to check
> the
> >> lag
> >> > > to
> >> > > > > > restart the consumer during machine crashes..
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> >> > > chen.apache.solr@gmail.com
> >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Thanks,Guozhang,
> >> > > > > > > So if I switch to SimpleConsumer, will these problems be
> taken
> >> > care
> >> > > > of
> >> > > > > > > already? I would assume that I will need to manage all the
> >> offset
> >> > > by
> >> > > > > > > myself, including the error recovery logic, right?
> >> > > > > > > Chen
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
> >> > wangguoz@gmail.com>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hello Chen,
> >> > > > > > >>
> >> > > > > > >> 1. Manually commit offsets does have the risk of
> duplicates,
> >> > > > consider
> >> > > > > > the
> >> > > > > > >> following pattern:
> >> > > > > > >>
> >> > > > > > >> message = consumer.next();
> >> > > > > > >> process(message);
> >> > > > > > >> consumer.commit();
> >> > > > > > >>
> >> > > > > > >> the rebalance can happen between line 2 and 3, where the
> >> message
> >> > > has
> >> > > > > > been
> >> > > > > > >> processed but offset not being committed, if another
> consumer
> >> > > picks
> >> > > > up
> >> > > > > > >> this
> >> > > > > > >> partition after the rebalance, it may re-consume this
> message
> >> > > again.
> >> > > > > > With
> >> > > > > > >> auto.commit turned on, offsets will always be committed
> >> before
> >> > the
> >> > > > > > >> consumers release ownership of partitions during
> rebalances.
> >> > > > > > >>
> >> > > > > > >> In the 0.9 consumer design, we have fixed this issue by
> >> > > introducing
> >> > > > > the
> >> > > > > > >> onPartitionDeassigned callback, you can take a look at its
> >> > current
> >> > > > API
> >> > > > > > >> here:
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > > > > >>
> >> > > > > > >> 2. Commit offsets too often does have an overhead since it
> is
> >> > > going
> >> > > > to
> >> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing
> >> that
> >> > > > issue
> >> > > > > > by
> >> > > > > > >> moving the offset management from ZK to kafka servers. This
> >> is
> >> > > > already
> >> > > > > > >> checked in trunk, and will be included in 0.8.2 release.
> >> > > > > > >>
> >> > > > > > >> Guozhang
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> >> > > > chen.apache.solr@gmail.com
> >> > > > > >
> >> > > > > > >> wrote:
> >> > > > > > >>
> >> > > > > > >> > Guozhang,
> >> > > > > > >> > Just to make it clear:
> >> > > > > > >> > If I have 10 threads with the same consumer group id,
> read
> >> the
> >> > > > topic
> >> > > > > > T.
> >> > > > > > >> The
> >> > > > > > >> > auto commit is turned off, and commitOffset is called
> only
> >> > when
> >> > > > the
> >> > > > > > >> message
> >> > > > > > >> > is processed successfully.
> >> > > > > > >> > If thread 1 dies when processing message from partition
> P1,
> >> > and
> >> > > > the
> >> > > > > > last
> >> > > > > > >> > offset is Offset1.   Then kafka will ensure that one of
> the
> >> > > other
> >> > > > > > >> running 9
> >> > > > > > >> > threads will automatically pick up the message on
> >> partition P1
> >> > > > from
> >> > > > > > >> Offset1
> >> > > > > > >> > ? will the thread have the risk of reading the same
> message
> >> > more
> >> > > > > than
> >> > > > > > >> once?
> >> > > > > > >> >
> >> > > > > > >> > Also I would assume commit offset for each message is a
> bit
> >> > > heavy.
> >> > > > > > What
> >> > > > > > >> you
> >> > > > > > >> > guys usually do for error handling during reading kafka?
> >> > > > > > >> > Thanks much!
> >> > > > > > >> > Chen
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> >> > > wangguoz@gmail.com
> >> > > > >
> >> > > > > > >> wrote:
> >> > > > > > >> >
> >> > > > > > >> > > Yes, in that case you can turn of auto commit and call
> >> > > > > commitOffsets
> >> > > > > > >> > > manually after processing is finished. commitOffsets()
> >> will
> >> > > only
> >> > > > > > write
> >> > > > > > >> > the
> >> > > > > > >> > > offset of the partitions that the consumer is currently
> >> > > > fetching,
> >> > > > > so
> >> > > > > > >> > there
> >> > > > > > >> > > is no need to coordinate this operation.
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> >> > > > > > chen.apache.solr@gmail.com
> >> > > > > > >> >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > >
> >> > > > > > >> > > > But with the auto commit turned on, I am risking off
> >> > losing
> >> > > > the
> >> > > > > > >> failed
> >> > > > > > >> > > > message, right? should I turn off the auto commit,
> and
> >> > only
> >> > > > > commit
> >> > > > > > >> the
> >> > > > > > >> > > > offset when the message is processed
> successfully..But
> >> > that
> >> > > > > would
> >> > > > > > >> > require
> >> > > > > > >> > > > the coordination between threads in order to know
> what
> >> is
> >> > > the
> >> > > > > > right
> >> > > > > > >> > > timing
> >> > > > > > >> > > > to commit offset..
> >> > > > > > >> > > >
> >> > > > > > >> > > >
> >> > > > > > >> > > >
> >> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> >> > > > > wangguoz@gmail.com
> >> > > > > > >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > > >
> >> > > > > > >> > > > > Hello Chen,
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > With high-level consumer, the partition
> >> re-assignment is
> >> > > > > > automatic
> >> > > > > > >> > upon
> >> > > > > > >> > > > > consumer failures.
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > Guozhang
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> >> > > > > > >> > chen.apache.solr@gmail.com>
> >> > > > > > >> > > > > wrote:
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > > Folks,
> >> > > > > > >> > > > > >  I have a process started at specific time and
> read
> >> > > from a
> >> > > > > > >> specific
> >> > > > > > >> > > > > topic.
> >> > > > > > >> > > > > > I am currently using the High Level API(consumer
> >> > group)
> >> > > to
> >> > > > > > read
> >> > > > > > >> > from
> >> > > > > > >> > > > > > kafka(and will stop once there is nothing in the
> >> topic
> >> > > by
> >> > > > > > >> > specifying
> >> > > > > > >> > > a
> >> > > > > > >> > > > > > timeout). i am most concerned about error
> recovery
> >> in
> >> > > > > multiple
> >> > > > > > >> > thread
> >> > > > > > >> > > > > > context. If one thread dies, will other running
> >> bolt
> >> > > > threads
> >> > > > > > >> picks
> >> > > > > > >> > up
> >> > > > > > >> > > > the
> >> > > > > > >> > > > > > failed message? Or I have to start another thread
> >> in
> >> > > order
> >> > > > > to
> >> > > > > > >> pick
> >> > > > > > >> > up
> >> > > > > > >> > > > the
> >> > > > > > >> > > > > > failed message? What would be  a good practice to
> >> > ensure
> >> > > > the
> >> > > > > > >> > message
> >> > > > > > >> > > > can
> >> > > > > > >> > > > > be
> >> > > > > > >> > > > > > processed at least once?
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Note that all threads are using the same group
> id.
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Thanks,
> >> > > > > > >> > > > > > Chen
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > --
> >> > > > > > >> > > > > -- Guozhang
> >> > > > > > >> > > > >
> >> > > > > > >> > > >
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > > --
> >> > > > > > >> > > -- Guozhang
> >> > > > > > >> > >
> >> > > > > > >> >
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> --
> >> > > > > > >> -- Guozhang
> >> > > > > > >>
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Guozhang,
Just curious, do you guys already have a java version of the
ConsumerOffsetChecker
https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
so that I could use it in my storm topology?
Chen


On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <ch...@gmail.com>
wrote:

> ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
> config. After fixing it it now behaves as expected.:-)
> Thanks again!!
> Chen
>
>
>
> On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Chen,
>>
>> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
>> Could
>> you try with larger numbers, like 10000?
>>
>> Guozhang
>>
>>
>> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <ch...@gmail.com>
>> wrote:
>>
>> > Guozhang,
>> > I just did a simple test, and kafka does not seem to do what it is
>> supposed
>> > to do:
>> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions,
>> and
>> > throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
>> >
>> >   while (it.hasNext()){
>> >
>> >        String message =  new String(it.next().message());
>> >
>> >        System.out.println("message received" + message);
>> >
>> >        int messageInt = Integer.parseInt(message);
>> >
>> >        if(messageInt % 2 == 0){
>> >
>> >         // crash all the even numbered message
>> >
>> >         throw new RuntimeException("mesasge " + message + " failed");
>> >
>> >        }
>> >
>> >        }}
>> >
>> > My config is like this;
>> >
>> >     props.put("zookeeper.connect", a_zookeeper);
>> >
>> >         props.put("group.id", a_groupId);
>> >
>> >         props.put("zookeeper.session.timeout.ms", "4000");
>> >
>> >         props.put("zookeeper.sync.time.ms", "200");
>> >
>> >         props.put("auto.commit.interval.ms", "1000");
>> >
>> >         props.put("consumer.timeout.ms","6000");
>> >
>> >         props.put("autocommit.interval.ms", "360000");
>> >
>> >         props.put("auto.offset.reset","smallest");
>> >
>> >
>> > I started 10 threads, but it seems that whenever I get the even numbered
>> > message, the thread crashes, then I restart them, it starts read from
>> the
>> > next message: so in the first batch:
>> >
>> > message received1
>> >
>> > message received2
>> >
>> > Then I start again:
>> >
>> > message received3
>> >
>> > message received4
>> >
>> >
>> > As you can see, message 2 is not replayed. Is this expected? I
>> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its
>> consistent
>> > with the testing result.(even numbered failed messages are not re
>> > retrieved)
>> >
>> > What i am missing here?
>> >
>> > Chen
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Chen,
>> > >
>> > > You can use the ConsumerOffsetChecker tool.
>> > >
>> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <
>> chen.apache.solr@gmail.com>
>> > > wrote:
>> > >
>> > > > sounds like a good idea! I think i will go with the high level
>> consumer
>> > > > then.
>> > > > Another question along with this design is that is there a way to
>> check
>> > > the
>> > > > lag for a consumer group for a topic? Upon machine crashes and
>> > restarts,
>> > > I
>> > > > want to only continue reading from a certain topic if the lag is NOT
>> > 0. I
>> > > > know I could depend on the time out("consumer.timeout.ms") to check
>> > > > whether
>> > > > there is still data in the topic, but wondering whether there is
>> more
>> > > > elegant way.
>> > > > Thanks much for the help, Guozhang!
>> > > >
>> > > > Chen
>> > > >
>> > > >
>> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Using simple consumer you then need to take care of consumer
>> failure
>> > > > > detection and partition reassignment yourself. But you would have
>> > more
>> > > > > flexibility of the offsets.
>> > > > >
>> > > > > If each time processing incur errors the corresponding consumer
>> > thread
>> > > > will
>> > > > > fail also (i.e. will not be involved in the rebalance and hence
>> > commit
>> > > > > offsets) and you could live with data duplicates, then you can
>> just
>> > > > enable
>> > > > > auto offset commits with say, 10 secs period. We usually have even
>> > > larger
>> > > > > period, like minutes.
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
>> > chen.apache.solr@gmail.com
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Maybe i could batch the messages before commit.., e.g committing
>> > > every
>> > > > 10
>> > > > > > second.this is what the auto commit does anyway and  I could
>> live
>> > > with
>> > > > > > duplicate data.
>> > > > > > What do u think?
>> > > > > >
>> > > > > > I would then also seem to need a monitoring daemon to check the
>> lag
>> > > to
>> > > > > > restart the consumer during machine crashes..
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
>> > > chen.apache.solr@gmail.com
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Thanks,Guozhang,
>> > > > > > > So if I switch to SimpleConsumer, will these problems be taken
>> > care
>> > > > of
>> > > > > > > already? I would assume that I will need to manage all the
>> offset
>> > > by
>> > > > > > > myself, including the error recovery logic, right?
>> > > > > > > Chen
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > >> Hello Chen,
>> > > > > > >>
>> > > > > > >> 1. Manually commit offsets does have the risk of duplicates,
>> > > > consider
>> > > > > > the
>> > > > > > >> following pattern:
>> > > > > > >>
>> > > > > > >> message = consumer.next();
>> > > > > > >> process(message);
>> > > > > > >> consumer.commit();
>> > > > > > >>
>> > > > > > >> the rebalance can happen between line 2 and 3, where the
>> message
>> > > has
>> > > > > > been
>> > > > > > >> processed but offset not being committed, if another consumer
>> > > picks
>> > > > up
>> > > > > > >> this
>> > > > > > >> partition after the rebalance, it may re-consume this message
>> > > again.
>> > > > > > With
>> > > > > > >> auto.commit turned on, offsets will always be committed
>> before
>> > the
>> > > > > > >> consumers release ownership of partitions during rebalances.
>> > > > > > >>
>> > > > > > >> In the 0.9 consumer design, we have fixed this issue by
>> > > introducing
>> > > > > the
>> > > > > > >> onPartitionDeassigned callback, you can take a look at its
>> > current
>> > > > API
>> > > > > > >> here:
>> > > > > > >>
>> > > > > > >>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
>> > > > > > >>
>> > > > > > >> 2. Commit offsets too often does have an overhead since it is
>> > > going
>> > > > to
>> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing
>> that
>> > > > issue
>> > > > > > by
>> > > > > > >> moving the offset management from ZK to kafka servers. This
>> is
>> > > > already
>> > > > > > >> checked in trunk, and will be included in 0.8.2 release.
>> > > > > > >>
>> > > > > > >> Guozhang
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
>> > > > chen.apache.solr@gmail.com
>> > > > > >
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >> > Guozhang,
>> > > > > > >> > Just to make it clear:
>> > > > > > >> > If I have 10 threads with the same consumer group id, read
>> the
>> > > > topic
>> > > > > > T.
>> > > > > > >> The
>> > > > > > >> > auto commit is turned off, and commitOffset is called only
>> > when
>> > > > the
>> > > > > > >> message
>> > > > > > >> > is processed successfully.
>> > > > > > >> > If thread 1 dies when processing message from partition P1,
>> > and
>> > > > the
>> > > > > > last
>> > > > > > >> > offset is Offset1.   Then kafka will ensure that one of the
>> > > other
>> > > > > > >> running 9
>> > > > > > >> > threads will automatically pick up the message on
>> partition P1
>> > > > from
>> > > > > > >> Offset1
>> > > > > > >> > ? will the thread have the risk of reading the same message
>> > more
>> > > > > than
>> > > > > > >> once?
>> > > > > > >> >
>> > > > > > >> > Also I would assume commit offset for each message is a bit
>> > > heavy.
>> > > > > > What
>> > > > > > >> you
>> > > > > > >> > guys usually do for error handling during reading kafka?
>> > > > > > >> > Thanks much!
>> > > > > > >> > Chen
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
>> > > wangguoz@gmail.com
>> > > > >
>> > > > > > >> wrote:
>> > > > > > >> >
>> > > > > > >> > > Yes, in that case you can turn of auto commit and call
>> > > > > commitOffsets
>> > > > > > >> > > manually after processing is finished. commitOffsets()
>> will
>> > > only
>> > > > > > write
>> > > > > > >> > the
>> > > > > > >> > > offset of the partitions that the consumer is currently
>> > > > fetching,
>> > > > > so
>> > > > > > >> > there
>> > > > > > >> > > is no need to coordinate this operation.
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
>> > > > > > chen.apache.solr@gmail.com
>> > > > > > >> >
>> > > > > > >> > > wrote:
>> > > > > > >> > >
>> > > > > > >> > > > But with the auto commit turned on, I am risking off
>> > losing
>> > > > the
>> > > > > > >> failed
>> > > > > > >> > > > message, right? should I turn off the auto commit, and
>> > only
>> > > > > commit
>> > > > > > >> the
>> > > > > > >> > > > offset when the message is processed successfully..But
>> > that
>> > > > > would
>> > > > > > >> > require
>> > > > > > >> > > > the coordination between threads in order to know what
>> is
>> > > the
>> > > > > > right
>> > > > > > >> > > timing
>> > > > > > >> > > > to commit offset..
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
>> > > > > wangguoz@gmail.com
>> > > > > > >
>> > > > > > >> > > wrote:
>> > > > > > >> > > >
>> > > > > > >> > > > > Hello Chen,
>> > > > > > >> > > > >
>> > > > > > >> > > > > With high-level consumer, the partition
>> re-assignment is
>> > > > > > automatic
>> > > > > > >> > upon
>> > > > > > >> > > > > consumer failures.
>> > > > > > >> > > > >
>> > > > > > >> > > > > Guozhang
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
>> > > > > > >> > chen.apache.solr@gmail.com>
>> > > > > > >> > > > > wrote:
>> > > > > > >> > > > >
>> > > > > > >> > > > > > Folks,
>> > > > > > >> > > > > >  I have a process started at specific time and read
>> > > from a
>> > > > > > >> specific
>> > > > > > >> > > > > topic.
>> > > > > > >> > > > > > I am currently using the High Level API(consumer
>> > group)
>> > > to
>> > > > > > read
>> > > > > > >> > from
>> > > > > > >> > > > > > kafka(and will stop once there is nothing in the
>> topic
>> > > by
>> > > > > > >> > specifying
>> > > > > > >> > > a
>> > > > > > >> > > > > > timeout). i am most concerned about error recovery
>> in
>> > > > > multiple
>> > > > > > >> > thread
>> > > > > > >> > > > > > context. If one thread dies, will other running
>> bolt
>> > > > threads
>> > > > > > >> picks
>> > > > > > >> > up
>> > > > > > >> > > > the
>> > > > > > >> > > > > > failed message? Or I have to start another thread
>> in
>> > > order
>> > > > > to
>> > > > > > >> pick
>> > > > > > >> > up
>> > > > > > >> > > > the
>> > > > > > >> > > > > > failed message? What would be  a good practice to
>> > ensure
>> > > > the
>> > > > > > >> > message
>> > > > > > >> > > > can
>> > > > > > >> > > > > be
>> > > > > > >> > > > > > processed at least once?
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > Note that all threads are using the same group id.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > Thanks,
>> > > > > > >> > > > > > Chen
>> > > > > > >> > > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > > --
>> > > > > > >> > > > > -- Guozhang
>> > > > > > >> > > > >
>> > > > > > >> > > >
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > --
>> > > > > > >> > > -- Guozhang
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> --
>> > > > > > >> -- Guozhang
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
config. After fixing it it now behaves as expected.:-)
Thanks again!!
Chen


On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Chen,
>
> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
> Could
> you try with larger numbers, like 10000?
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Guozhang,
> > I just did a simple test, and kafka does not seem to do what it is
> supposed
> > to do:
> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
> > throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
> >
> >   while (it.hasNext()){
> >
> >        String message =  new String(it.next().message());
> >
> >        System.out.println("message received" + message);
> >
> >        int messageInt = Integer.parseInt(message);
> >
> >        if(messageInt % 2 == 0){
> >
> >         // crash all the even numbered message
> >
> >         throw new RuntimeException("mesasge " + message + " failed");
> >
> >        }
> >
> >        }}
> >
> > My config is like this;
> >
> >     props.put("zookeeper.connect", a_zookeeper);
> >
> >         props.put("group.id", a_groupId);
> >
> >         props.put("zookeeper.session.timeout.ms", "4000");
> >
> >         props.put("zookeeper.sync.time.ms", "200");
> >
> >         props.put("auto.commit.interval.ms", "1000");
> >
> >         props.put("consumer.timeout.ms","6000");
> >
> >         props.put("autocommit.interval.ms", "360000");
> >
> >         props.put("auto.offset.reset","smallest");
> >
> >
> > I started 10 threads, but it seems that whenever I get the even numbered
> > message, the thread crashes, then I restart them, it starts read from the
> > next message: so in the first batch:
> >
> > message received1
> >
> > message received2
> >
> > Then I start again:
> >
> > message received3
> >
> > message received4
> >
> >
> > As you can see, message 2 is not replayed. Is this expected? I
> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
> > with the testing result.(even numbered failed messages are not re
> > retrieved)
> >
> > What i am missing here?
> >
> > Chen
> >
> >
> >
> >
> >
> >
> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Chen,
> > >
> > > You can use the ConsumerOffsetChecker tool.
> > >
> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > sounds like a good idea! I think i will go with the high level
> consumer
> > > > then.
> > > > Another question along with this design is that is there a way to
> check
> > > the
> > > > lag for a consumer group for a topic? Upon machine crashes and
> > restarts,
> > > I
> > > > want to only continue reading from a certain topic if the lag is NOT
> > 0. I
> > > > know I could depend on the time out("consumer.timeout.ms") to check
> > > > whether
> > > > there is still data in the topic, but wondering whether there is more
> > > > elegant way.
> > > > Thanks much for the help, Guozhang!
> > > >
> > > > Chen
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Using simple consumer you then need to take care of consumer
> failure
> > > > > detection and partition reassignment yourself. But you would have
> > more
> > > > > flexibility of the offsets.
> > > > >
> > > > > If each time processing incur errors the corresponding consumer
> > thread
> > > > will
> > > > > fail also (i.e. will not be involved in the rebalance and hence
> > commit
> > > > > offsets) and you could live with data duplicates, then you can just
> > > > enable
> > > > > auto offset commits with say, 10 secs period. We usually have even
> > > larger
> > > > > period, like minutes.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Maybe i could batch the messages before commit.., e.g committing
> > > every
> > > > 10
> > > > > > second.this is what the auto commit does anyway and  I could live
> > > with
> > > > > > duplicate data.
> > > > > > What do u think?
> > > > > >
> > > > > > I would then also seem to need a monitoring daemon to check the
> lag
> > > to
> > > > > > restart the consumer during machine crashes..
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks,Guozhang,
> > > > > > > So if I switch to SimpleConsumer, will these problems be taken
> > care
> > > > of
> > > > > > > already? I would assume that I will need to manage all the
> offset
> > > by
> > > > > > > myself, including the error recovery logic, right?
> > > > > > > Chen
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hello Chen,
> > > > > > >>
> > > > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > > > consider
> > > > > > the
> > > > > > >> following pattern:
> > > > > > >>
> > > > > > >> message = consumer.next();
> > > > > > >> process(message);
> > > > > > >> consumer.commit();
> > > > > > >>
> > > > > > >> the rebalance can happen between line 2 and 3, where the
> message
> > > has
> > > > > > been
> > > > > > >> processed but offset not being committed, if another consumer
> > > picks
> > > > up
> > > > > > >> this
> > > > > > >> partition after the rebalance, it may re-consume this message
> > > again.
> > > > > > With
> > > > > > >> auto.commit turned on, offsets will always be committed before
> > the
> > > > > > >> consumers release ownership of partitions during rebalances.
> > > > > > >>
> > > > > > >> In the 0.9 consumer design, we have fixed this issue by
> > > introducing
> > > > > the
> > > > > > >> onPartitionDeassigned callback, you can take a look at its
> > current
> > > > API
> > > > > > >> here:
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >>
> > > > > > >> 2. Commit offsets too often does have an overhead since it is
> > > going
> > > > to
> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing
> that
> > > > issue
> > > > > > by
> > > > > > >> moving the offset management from ZK to kafka servers. This is
> > > > already
> > > > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > > > >>
> > > > > > >> Guozhang
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > > > chen.apache.solr@gmail.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Guozhang,
> > > > > > >> > Just to make it clear:
> > > > > > >> > If I have 10 threads with the same consumer group id, read
> the
> > > > topic
> > > > > > T.
> > > > > > >> The
> > > > > > >> > auto commit is turned off, and commitOffset is called only
> > when
> > > > the
> > > > > > >> message
> > > > > > >> > is processed successfully.
> > > > > > >> > If thread 1 dies when processing message from partition P1,
> > and
> > > > the
> > > > > > last
> > > > > > >> > offset is Offset1.   Then kafka will ensure that one of the
> > > other
> > > > > > >> running 9
> > > > > > >> > threads will automatically pick up the message on partition
> P1
> > > > from
> > > > > > >> Offset1
> > > > > > >> > ? will the thread have the risk of reading the same message
> > more
> > > > > than
> > > > > > >> once?
> > > > > > >> >
> > > > > > >> > Also I would assume commit offset for each message is a bit
> > > heavy.
> > > > > > What
> > > > > > >> you
> > > > > > >> > guys usually do for error handling during reading kafka?
> > > > > > >> > Thanks much!
> > > > > > >> > Chen
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Yes, in that case you can turn of auto commit and call
> > > > > commitOffsets
> > > > > > >> > > manually after processing is finished. commitOffsets()
> will
> > > only
> > > > > > write
> > > > > > >> > the
> > > > > > >> > > offset of the partitions that the consumer is currently
> > > > fetching,
> > > > > so
> > > > > > >> > there
> > > > > > >> > > is no need to coordinate this operation.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > > > > chen.apache.solr@gmail.com
> > > > > > >> >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > But with the auto commit turned on, I am risking off
> > losing
> > > > the
> > > > > > >> failed
> > > > > > >> > > > message, right? should I turn off the auto commit, and
> > only
> > > > > commit
> > > > > > >> the
> > > > > > >> > > > offset when the message is processed successfully..But
> > that
> > > > > would
> > > > > > >> > require
> > > > > > >> > > > the coordination between threads in order to know what
> is
> > > the
> > > > > > right
> > > > > > >> > > timing
> > > > > > >> > > > to commit offset..
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> > > > > wangguoz@gmail.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hello Chen,
> > > > > > >> > > > >
> > > > > > >> > > > > With high-level consumer, the partition re-assignment
> is
> > > > > > automatic
> > > > > > >> > upon
> > > > > > >> > > > > consumer failures.
> > > > > > >> > > > >
> > > > > > >> > > > > Guozhang
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > > > > > >> > chen.apache.solr@gmail.com>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Folks,
> > > > > > >> > > > > >  I have a process started at specific time and read
> > > from a
> > > > > > >> specific
> > > > > > >> > > > > topic.
> > > > > > >> > > > > > I am currently using the High Level API(consumer
> > group)
> > > to
> > > > > > read
> > > > > > >> > from
> > > > > > >> > > > > > kafka(and will stop once there is nothing in the
> topic
> > > by
> > > > > > >> > specifying
> > > > > > >> > > a
> > > > > > >> > > > > > timeout). i am most concerned about error recovery
> in
> > > > > multiple
> > > > > > >> > thread
> > > > > > >> > > > > > context. If one thread dies, will other running bolt
> > > > threads
> > > > > > >> picks
> > > > > > >> > up
> > > > > > >> > > > the
> > > > > > >> > > > > > failed message? Or I have to start another thread in
> > > order
> > > > > to
> > > > > > >> pick
> > > > > > >> > up
> > > > > > >> > > > the
> > > > > > >> > > > > > failed message? What would be  a good practice to
> > ensure
> > > > the
> > > > > > >> > message
> > > > > > >> > > > can
> > > > > > >> > > > > be
> > > > > > >> > > > > > processed at least once?
> > > > > > >> > > > > >
> > > > > > >> > > > > > Note that all threads are using the same group id.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks,
> > > > > > >> > > > > > Chen
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > --
> > > > > > >> > > > > -- Guozhang
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > --
> > > > > > >> > > -- Guozhang
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> -- Guozhang
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Chen,

Your auto.commit.interval.ms is set to 1 sec, which may be too small. Could
you try with larger numbers, like 10000?

Guozhang


On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <ch...@gmail.com>
wrote:

> Guozhang,
> I just did a simple test, and kafka does not seem to do what it is supposed
> to do:
> I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
> throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
>
>   while (it.hasNext()){
>
>        String message =  new String(it.next().message());
>
>        System.out.println("message received" + message);
>
>        int messageInt = Integer.parseInt(message);
>
>        if(messageInt % 2 == 0){
>
>         // crash all the even numbered message
>
>         throw new RuntimeException("mesasge " + message + " failed");
>
>        }
>
>        }}
>
> My config is like this;
>
>     props.put("zookeeper.connect", a_zookeeper);
>
>         props.put("group.id", a_groupId);
>
>         props.put("zookeeper.session.timeout.ms", "4000");
>
>         props.put("zookeeper.sync.time.ms", "200");
>
>         props.put("auto.commit.interval.ms", "1000");
>
>         props.put("consumer.timeout.ms","6000");
>
>         props.put("autocommit.interval.ms", "360000");
>
>         props.put("auto.offset.reset","smallest");
>
>
> I started 10 threads, but it seems that whenever I get the even numbered
> message, the thread crashes, then I restart them, it starts read from the
> next message: so in the first batch:
>
> message received1
>
> message received2
>
> Then I start again:
>
> message received3
>
> message received4
>
>
> As you can see, message 2 is not replayed. Is this expected? I
> run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
> with the testing result.(even numbered failed messages are not re
> retrieved)
>
> What i am missing here?
>
> Chen
>
>
>
>
>
>
> On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Chen,
> >
> > You can use the ConsumerOffsetChecker tool.
> >
> > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> >
> > Guozhang
> >
> >
> > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > sounds like a good idea! I think i will go with the high level consumer
> > > then.
> > > Another question along with this design is that is there a way to check
> > the
> > > lag for a consumer group for a topic? Upon machine crashes and
> restarts,
> > I
> > > want to only continue reading from a certain topic if the lag is NOT
> 0. I
> > > know I could depend on the time out("consumer.timeout.ms") to check
> > > whether
> > > there is still data in the topic, but wondering whether there is more
> > > elegant way.
> > > Thanks much for the help, Guozhang!
> > >
> > > Chen
> > >
> > >
> > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Using simple consumer you then need to take care of consumer failure
> > > > detection and partition reassignment yourself. But you would have
> more
> > > > flexibility of the offsets.
> > > >
> > > > If each time processing incur errors the corresponding consumer
> thread
> > > will
> > > > fail also (i.e. will not be involved in the rebalance and hence
> commit
> > > > offsets) and you could live with data duplicates, then you can just
> > > enable
> > > > auto offset commits with say, 10 secs period. We usually have even
> > larger
> > > > period, like minutes.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Maybe i could batch the messages before commit.., e.g committing
> > every
> > > 10
> > > > > second.this is what the auto commit does anyway and  I could live
> > with
> > > > > duplicate data.
> > > > > What do u think?
> > > > >
> > > > > I would then also seem to need a monitoring daemon to check the lag
> > to
> > > > > restart the consumer during machine crashes..
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks,Guozhang,
> > > > > > So if I switch to SimpleConsumer, will these problems be taken
> care
> > > of
> > > > > > already? I would assume that I will need to manage all the offset
> > by
> > > > > > myself, including the error recovery logic, right?
> > > > > > Chen
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hello Chen,
> > > > > >>
> > > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > > consider
> > > > > the
> > > > > >> following pattern:
> > > > > >>
> > > > > >> message = consumer.next();
> > > > > >> process(message);
> > > > > >> consumer.commit();
> > > > > >>
> > > > > >> the rebalance can happen between line 2 and 3, where the message
> > has
> > > > > been
> > > > > >> processed but offset not being committed, if another consumer
> > picks
> > > up
> > > > > >> this
> > > > > >> partition after the rebalance, it may re-consume this message
> > again.
> > > > > With
> > > > > >> auto.commit turned on, offsets will always be committed before
> the
> > > > > >> consumers release ownership of partitions during rebalances.
> > > > > >>
> > > > > >> In the 0.9 consumer design, we have fixed this issue by
> > introducing
> > > > the
> > > > > >> onPartitionDeassigned callback, you can take a look at its
> current
> > > API
> > > > > >> here:
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >>
> > > > > >> 2. Commit offsets too often does have an overhead since it is
> > going
> > > to
> > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that
> > > issue
> > > > > by
> > > > > >> moving the offset management from ZK to kafka servers. This is
> > > already
> > > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Guozhang,
> > > > > >> > Just to make it clear:
> > > > > >> > If I have 10 threads with the same consumer group id, read the
> > > topic
> > > > > T.
> > > > > >> The
> > > > > >> > auto commit is turned off, and commitOffset is called only
> when
> > > the
> > > > > >> message
> > > > > >> > is processed successfully.
> > > > > >> > If thread 1 dies when processing message from partition P1,
> and
> > > the
> > > > > last
> > > > > >> > offset is Offset1.   Then kafka will ensure that one of the
> > other
> > > > > >> running 9
> > > > > >> > threads will automatically pick up the message on partition P1
> > > from
> > > > > >> Offset1
> > > > > >> > ? will the thread have the risk of reading the same message
> more
> > > > than
> > > > > >> once?
> > > > > >> >
> > > > > >> > Also I would assume commit offset for each message is a bit
> > heavy.
> > > > > What
> > > > > >> you
> > > > > >> > guys usually do for error handling during reading kafka?
> > > > > >> > Thanks much!
> > > > > >> > Chen
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Yes, in that case you can turn of auto commit and call
> > > > commitOffsets
> > > > > >> > > manually after processing is finished. commitOffsets() will
> > only
> > > > > write
> > > > > >> > the
> > > > > >> > > offset of the partitions that the consumer is currently
> > > fetching,
> > > > so
> > > > > >> > there
> > > > > >> > > is no need to coordinate this operation.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > > > chen.apache.solr@gmail.com
> > > > > >> >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > But with the auto commit turned on, I am risking off
> losing
> > > the
> > > > > >> failed
> > > > > >> > > > message, right? should I turn off the auto commit, and
> only
> > > > commit
> > > > > >> the
> > > > > >> > > > offset when the message is processed successfully..But
> that
> > > > would
> > > > > >> > require
> > > > > >> > > > the coordination between threads in order to know what is
> > the
> > > > > right
> > > > > >> > > timing
> > > > > >> > > > to commit offset..
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> > > > wangguoz@gmail.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hello Chen,
> > > > > >> > > > >
> > > > > >> > > > > With high-level consumer, the partition re-assignment is
> > > > > automatic
> > > > > >> > upon
> > > > > >> > > > > consumer failures.
> > > > > >> > > > >
> > > > > >> > > > > Guozhang
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > > > > >> > chen.apache.solr@gmail.com>
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Folks,
> > > > > >> > > > > >  I have a process started at specific time and read
> > from a
> > > > > >> specific
> > > > > >> > > > > topic.
> > > > > >> > > > > > I am currently using the High Level API(consumer
> group)
> > to
> > > > > read
> > > > > >> > from
> > > > > >> > > > > > kafka(and will stop once there is nothing in the topic
> > by
> > > > > >> > specifying
> > > > > >> > > a
> > > > > >> > > > > > timeout). i am most concerned about error recovery in
> > > > multiple
> > > > > >> > thread
> > > > > >> > > > > > context. If one thread dies, will other running bolt
> > > threads
> > > > > >> picks
> > > > > >> > up
> > > > > >> > > > the
> > > > > >> > > > > > failed message? Or I have to start another thread in
> > order
> > > > to
> > > > > >> pick
> > > > > >> > up
> > > > > >> > > > the
> > > > > >> > > > > > failed message? What would be  a good practice to
> ensure
> > > the
> > > > > >> > message
> > > > > >> > > > can
> > > > > >> > > > > be
> > > > > >> > > > > > processed at least once?
> > > > > >> > > > > >
> > > > > >> > > > > > Note that all threads are using the same group id.
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Chen
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > --
> > > > > >> > > > > -- Guozhang
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > --
> > > > > >> > > -- Guozhang
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Guozhang,
I just did a simple test, and kafka does not seem to do what it is supposed
to do:
I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
throw Runtime exception on all the even numbered messages. (2, 4, 6,..)

  while (it.hasNext()){

       String message =  new String(it.next().message());

       System.out.println("message received" + message);

       int messageInt = Integer.parseInt(message);

       if(messageInt % 2 == 0){

        // crash all the even numbered message

        throw new RuntimeException("mesasge " + message + " failed");

       }

       }}

My config is like this;

    props.put("zookeeper.connect", a_zookeeper);

        props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms", "4000");

        props.put("zookeeper.sync.time.ms", "200");

        props.put("auto.commit.interval.ms", "1000");

        props.put("consumer.timeout.ms","6000");

        props.put("autocommit.interval.ms", "360000");

        props.put("auto.offset.reset","smallest");


I started 10 threads, but it seems that whenever I get the even numbered
message, the thread crashes, then I restart them, it starts read from the
next message: so in the first batch:

message received1

message received2

Then I start again:

message received3

message received4


As you can see, message 2 is not replayed. Is this expected? I
run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
with the testing result.(even numbered failed messages are not re retrieved)

What i am missing here?

Chen






On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Chen,
>
> You can use the ConsumerOffsetChecker tool.
>
> http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > sounds like a good idea! I think i will go with the high level consumer
> > then.
> > Another question along with this design is that is there a way to check
> the
> > lag for a consumer group for a topic? Upon machine crashes and restarts,
> I
> > want to only continue reading from a certain topic if the lag is NOT 0. I
> > know I could depend on the time out("consumer.timeout.ms") to check
> > whether
> > there is still data in the topic, but wondering whether there is more
> > elegant way.
> > Thanks much for the help, Guozhang!
> >
> > Chen
> >
> >
> > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Using simple consumer you then need to take care of consumer failure
> > > detection and partition reassignment yourself. But you would have more
> > > flexibility of the offsets.
> > >
> > > If each time processing incur errors the corresponding consumer thread
> > will
> > > fail also (i.e. will not be involved in the rebalance and hence commit
> > > offsets) and you could live with data duplicates, then you can just
> > enable
> > > auto offset commits with say, 10 secs period. We usually have even
> larger
> > > period, like minutes.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > Maybe i could batch the messages before commit.., e.g committing
> every
> > 10
> > > > second.this is what the auto commit does anyway and  I could live
> with
> > > > duplicate data.
> > > > What do u think?
> > > >
> > > > I would then also seem to need a monitoring daemon to check the lag
> to
> > > > restart the consumer during machine crashes..
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thanks,Guozhang,
> > > > > So if I switch to SimpleConsumer, will these problems be taken care
> > of
> > > > > already? I would assume that I will need to manage all the offset
> by
> > > > > myself, including the error recovery logic, right?
> > > > > Chen
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hello Chen,
> > > > >>
> > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > consider
> > > > the
> > > > >> following pattern:
> > > > >>
> > > > >> message = consumer.next();
> > > > >> process(message);
> > > > >> consumer.commit();
> > > > >>
> > > > >> the rebalance can happen between line 2 and 3, where the message
> has
> > > > been
> > > > >> processed but offset not being committed, if another consumer
> picks
> > up
> > > > >> this
> > > > >> partition after the rebalance, it may re-consume this message
> again.
> > > > With
> > > > >> auto.commit turned on, offsets will always be committed before the
> > > > >> consumers release ownership of partitions during rebalances.
> > > > >>
> > > > >> In the 0.9 consumer design, we have fixed this issue by
> introducing
> > > the
> > > > >> onPartitionDeassigned callback, you can take a look at its current
> > API
> > > > >> here:
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >>
> > > > >> 2. Commit offsets too often does have an overhead since it is
> going
> > to
> > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that
> > issue
> > > > by
> > > > >> moving the offset management from ZK to kafka servers. This is
> > already
> > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Guozhang,
> > > > >> > Just to make it clear:
> > > > >> > If I have 10 threads with the same consumer group id, read the
> > topic
> > > > T.
> > > > >> The
> > > > >> > auto commit is turned off, and commitOffset is called only when
> > the
> > > > >> message
> > > > >> > is processed successfully.
> > > > >> > If thread 1 dies when processing message from partition P1, and
> > the
> > > > last
> > > > >> > offset is Offset1.   Then kafka will ensure that one of the
> other
> > > > >> running 9
> > > > >> > threads will automatically pick up the message on partition P1
> > from
> > > > >> Offset1
> > > > >> > ? will the thread have the risk of reading the same message more
> > > than
> > > > >> once?
> > > > >> >
> > > > >> > Also I would assume commit offset for each message is a bit
> heavy.
> > > > What
> > > > >> you
> > > > >> > guys usually do for error handling during reading kafka?
> > > > >> > Thanks much!
> > > > >> > Chen
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > >> wrote:
> > > > >> >
> > > > >> > > Yes, in that case you can turn of auto commit and call
> > > commitOffsets
> > > > >> > > manually after processing is finished. commitOffsets() will
> only
> > > > write
> > > > >> > the
> > > > >> > > offset of the partitions that the consumer is currently
> > fetching,
> > > so
> > > > >> > there
> > > > >> > > is no need to coordinate this operation.
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > > chen.apache.solr@gmail.com
> > > > >> >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > But with the auto commit turned on, I am risking off losing
> > the
> > > > >> failed
> > > > >> > > > message, right? should I turn off the auto commit, and only
> > > commit
> > > > >> the
> > > > >> > > > offset when the message is processed successfully..But that
> > > would
> > > > >> > require
> > > > >> > > > the coordination between threads in order to know what is
> the
> > > > right
> > > > >> > > timing
> > > > >> > > > to commit offset..
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > >
> > > > >> > > > > Hello Chen,
> > > > >> > > > >
> > > > >> > > > > With high-level consumer, the partition re-assignment is
> > > > automatic
> > > > >> > upon
> > > > >> > > > > consumer failures.
> > > > >> > > > >
> > > > >> > > > > Guozhang
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > > > >> > chen.apache.solr@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Folks,
> > > > >> > > > > >  I have a process started at specific time and read
> from a
> > > > >> specific
> > > > >> > > > > topic.
> > > > >> > > > > > I am currently using the High Level API(consumer group)
> to
> > > > read
> > > > >> > from
> > > > >> > > > > > kafka(and will stop once there is nothing in the topic
> by
> > > > >> > specifying
> > > > >> > > a
> > > > >> > > > > > timeout). i am most concerned about error recovery in
> > > multiple
> > > > >> > thread
> > > > >> > > > > > context. If one thread dies, will other running bolt
> > threads
> > > > >> picks
> > > > >> > up
> > > > >> > > > the
> > > > >> > > > > > failed message? Or I have to start another thread in
> order
> > > to
> > > > >> pick
> > > > >> > up
> > > > >> > > > the
> > > > >> > > > > > failed message? What would be  a good practice to ensure
> > the
> > > > >> > message
> > > > >> > > > can
> > > > >> > > > > be
> > > > >> > > > > > processed at least once?
> > > > >> > > > > >
> > > > >> > > > > > Note that all threads are using the same group id.
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Chen
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > --
> > > > >> > > > > -- Guozhang
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > --
> > > > >> > > -- Guozhang
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Chen,

You can use the ConsumerOffsetChecker tool.

http://kafka.apache.org/documentation.html#basic_ops_consumer_lag

Guozhang


On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <ch...@gmail.com>
wrote:

> sounds like a good idea! I think i will go with the high level consumer
> then.
> Another question along with this design is that is there a way to check the
> lag for a consumer group for a topic? Upon machine crashes and restarts, I
> want to only continue reading from a certain topic if the lag is NOT 0. I
> know I could depend on the time out("consumer.timeout.ms") to check
> whether
> there is still data in the topic, but wondering whether there is more
> elegant way.
> Thanks much for the help, Guozhang!
>
> Chen
>
>
> On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Using simple consumer you then need to take care of consumer failure
> > detection and partition reassignment yourself. But you would have more
> > flexibility of the offsets.
> >
> > If each time processing incur errors the corresponding consumer thread
> will
> > fail also (i.e. will not be involved in the rebalance and hence commit
> > offsets) and you could live with data duplicates, then you can just
> enable
> > auto offset commits with say, 10 secs period. We usually have even larger
> > period, like minutes.
> >
> > Guozhang
> >
> >
> > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > Maybe i could batch the messages before commit.., e.g committing every
> 10
> > > second.this is what the auto commit does anyway and  I could live with
> > > duplicate data.
> > > What do u think?
> > >
> > > I would then also seem to need a monitoring daemon to check the lag to
> > > restart the consumer during machine crashes..
> > >
> > >
> > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks,Guozhang,
> > > > So if I switch to SimpleConsumer, will these problems be taken care
> of
> > > > already? I would assume that I will need to manage all the offset by
> > > > myself, including the error recovery logic, right?
> > > > Chen
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello Chen,
> > > >>
> > > >> 1. Manually commit offsets does have the risk of duplicates,
> consider
> > > the
> > > >> following pattern:
> > > >>
> > > >> message = consumer.next();
> > > >> process(message);
> > > >> consumer.commit();
> > > >>
> > > >> the rebalance can happen between line 2 and 3, where the message has
> > > been
> > > >> processed but offset not being committed, if another consumer picks
> up
> > > >> this
> > > >> partition after the rebalance, it may re-consume this message again.
> > > With
> > > >> auto.commit turned on, offsets will always be committed before the
> > > >> consumers release ownership of partitions during rebalances.
> > > >>
> > > >> In the 0.9 consumer design, we have fixed this issue by introducing
> > the
> > > >> onPartitionDeassigned callback, you can take a look at its current
> API
> > > >> here:
> > > >>
> > > >>
> > > >>
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >>
> > > >> 2. Commit offsets too often does have an overhead since it is going
> to
> > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that
> issue
> > > by
> > > >> moving the offset management from ZK to kafka servers. This is
> already
> > > >> checked in trunk, and will be included in 0.8.2 release.
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > Guozhang,
> > > >> > Just to make it clear:
> > > >> > If I have 10 threads with the same consumer group id, read the
> topic
> > > T.
> > > >> The
> > > >> > auto commit is turned off, and commitOffset is called only when
> the
> > > >> message
> > > >> > is processed successfully.
> > > >> > If thread 1 dies when processing message from partition P1, and
> the
> > > last
> > > >> > offset is Offset1.   Then kafka will ensure that one of the other
> > > >> running 9
> > > >> > threads will automatically pick up the message on partition P1
> from
> > > >> Offset1
> > > >> > ? will the thread have the risk of reading the same message more
> > than
> > > >> once?
> > > >> >
> > > >> > Also I would assume commit offset for each message is a bit heavy.
> > > What
> > > >> you
> > > >> > guys usually do for error handling during reading kafka?
> > > >> > Thanks much!
> > > >> > Chen
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > Yes, in that case you can turn of auto commit and call
> > commitOffsets
> > > >> > > manually after processing is finished. commitOffsets() will only
> > > write
> > > >> > the
> > > >> > > offset of the partitions that the consumer is currently
> fetching,
> > so
> > > >> > there
> > > >> > > is no need to coordinate this operation.
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > But with the auto commit turned on, I am risking off losing
> the
> > > >> failed
> > > >> > > > message, right? should I turn off the auto commit, and only
> > commit
> > > >> the
> > > >> > > > offset when the message is processed successfully..But that
> > would
> > > >> > require
> > > >> > > > the coordination between threads in order to know what is the
> > > right
> > > >> > > timing
> > > >> > > > to commit offset..
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > >
> > > >> > > > > Hello Chen,
> > > >> > > > >
> > > >> > > > > With high-level consumer, the partition re-assignment is
> > > automatic
> > > >> > upon
> > > >> > > > > consumer failures.
> > > >> > > > >
> > > >> > > > > Guozhang
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > > >> > chen.apache.solr@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Folks,
> > > >> > > > > >  I have a process started at specific time and read from a
> > > >> specific
> > > >> > > > > topic.
> > > >> > > > > > I am currently using the High Level API(consumer group) to
> > > read
> > > >> > from
> > > >> > > > > > kafka(and will stop once there is nothing in the topic by
> > > >> > specifying
> > > >> > > a
> > > >> > > > > > timeout). i am most concerned about error recovery in
> > multiple
> > > >> > thread
> > > >> > > > > > context. If one thread dies, will other running bolt
> threads
> > > >> picks
> > > >> > up
> > > >> > > > the
> > > >> > > > > > failed message? Or I have to start another thread in order
> > to
> > > >> pick
> > > >> > up
> > > >> > > > the
> > > >> > > > > > failed message? What would be  a good practice to ensure
> the
> > > >> > message
> > > >> > > > can
> > > >> > > > > be
> > > >> > > > > > processed at least once?
> > > >> > > > > >
> > > >> > > > > > Note that all threads are using the same group id.
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Chen
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > --
> > > >> > > > > -- Guozhang
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > -- Guozhang
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
sounds like a good idea! I think i will go with the high level consumer
then.
Another question along with this design is that is there a way to check the
lag for a consumer group for a topic? Upon machine crashes and restarts, I
want to only continue reading from a certain topic if the lag is NOT 0. I
know I could depend on the time out("consumer.timeout.ms") to check whether
there is still data in the topic, but wondering whether there is more
elegant way.
Thanks much for the help, Guozhang!

Chen


On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Using simple consumer you then need to take care of consumer failure
> detection and partition reassignment yourself. But you would have more
> flexibility of the offsets.
>
> If each time processing incur errors the corresponding consumer thread will
> fail also (i.e. will not be involved in the rebalance and hence commit
> offsets) and you could live with data duplicates, then you can just enable
> auto offset commits with say, 10 secs period. We usually have even larger
> period, like minutes.
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Maybe i could batch the messages before commit.., e.g committing every 10
> > second.this is what the auto commit does anyway and  I could live with
> > duplicate data.
> > What do u think?
> >
> > I would then also seem to need a monitoring daemon to check the lag to
> > restart the consumer during machine crashes..
> >
> >
> > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > Thanks,Guozhang,
> > > So if I switch to SimpleConsumer, will these problems be taken care of
> > > already? I would assume that I will need to manage all the offset by
> > > myself, including the error recovery logic, right?
> > > Chen
> > >
> > >
> > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> Hello Chen,
> > >>
> > >> 1. Manually commit offsets does have the risk of duplicates, consider
> > the
> > >> following pattern:
> > >>
> > >> message = consumer.next();
> > >> process(message);
> > >> consumer.commit();
> > >>
> > >> the rebalance can happen between line 2 and 3, where the message has
> > been
> > >> processed but offset not being committed, if another consumer picks up
> > >> this
> > >> partition after the rebalance, it may re-consume this message again.
> > With
> > >> auto.commit turned on, offsets will always be committed before the
> > >> consumers release ownership of partitions during rebalances.
> > >>
> > >> In the 0.9 consumer design, we have fixed this issue by introducing
> the
> > >> onPartitionDeassigned callback, you can take a look at its current API
> > >> here:
> > >>
> > >>
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >>
> > >> 2. Commit offsets too often does have an overhead since it is going to
> > >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue
> > by
> > >> moving the offset management from ZK to kafka servers. This is already
> > >> checked in trunk, and will be included in 0.8.2 release.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <chen.apache.solr@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Guozhang,
> > >> > Just to make it clear:
> > >> > If I have 10 threads with the same consumer group id, read the topic
> > T.
> > >> The
> > >> > auto commit is turned off, and commitOffset is called only when the
> > >> message
> > >> > is processed successfully.
> > >> > If thread 1 dies when processing message from partition P1, and the
> > last
> > >> > offset is Offset1.   Then kafka will ensure that one of the other
> > >> running 9
> > >> > threads will automatically pick up the message on partition P1 from
> > >> Offset1
> > >> > ? will the thread have the risk of reading the same message more
> than
> > >> once?
> > >> >
> > >> > Also I would assume commit offset for each message is a bit heavy.
> > What
> > >> you
> > >> > guys usually do for error handling during reading kafka?
> > >> > Thanks much!
> > >> > Chen
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Yes, in that case you can turn of auto commit and call
> commitOffsets
> > >> > > manually after processing is finished. commitOffsets() will only
> > write
> > >> > the
> > >> > > offset of the partitions that the consumer is currently fetching,
> so
> > >> > there
> > >> > > is no need to coordinate this operation.
> > >> > >
> > >> > >
> > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > chen.apache.solr@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > But with the auto commit turned on, I am risking off losing the
> > >> failed
> > >> > > > message, right? should I turn off the auto commit, and only
> commit
> > >> the
> > >> > > > offset when the message is processed successfully..But that
> would
> > >> > require
> > >> > > > the coordination between threads in order to know what is the
> > right
> > >> > > timing
> > >> > > > to commit offset..
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hello Chen,
> > >> > > > >
> > >> > > > > With high-level consumer, the partition re-assignment is
> > automatic
> > >> > upon
> > >> > > > > consumer failures.
> > >> > > > >
> > >> > > > > Guozhang
> > >> > > > >
> > >> > > > >
> > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > >> > chen.apache.solr@gmail.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Folks,
> > >> > > > > >  I have a process started at specific time and read from a
> > >> specific
> > >> > > > > topic.
> > >> > > > > > I am currently using the High Level API(consumer group) to
> > read
> > >> > from
> > >> > > > > > kafka(and will stop once there is nothing in the topic by
> > >> > specifying
> > >> > > a
> > >> > > > > > timeout). i am most concerned about error recovery in
> multiple
> > >> > thread
> > >> > > > > > context. If one thread dies, will other running bolt threads
> > >> picks
> > >> > up
> > >> > > > the
> > >> > > > > > failed message? Or I have to start another thread in order
> to
> > >> pick
> > >> > up
> > >> > > > the
> > >> > > > > > failed message? What would be  a good practice to ensure the
> > >> > message
> > >> > > > can
> > >> > > > > be
> > >> > > > > > processed at least once?
> > >> > > > > >
> > >> > > > > > Note that all threads are using the same group id.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Chen
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > -- Guozhang
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Using simple consumer you then need to take care of consumer failure
detection and partition reassignment yourself. But you would have more
flexibility of the offsets.

If each time processing incur errors the corresponding consumer thread will
fail also (i.e. will not be involved in the rebalance and hence commit
offsets) and you could live with data duplicates, then you can just enable
auto offset commits with say, 10 secs period. We usually have even larger
period, like minutes.

Guozhang


On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <ch...@gmail.com>
wrote:

> Maybe i could batch the messages before commit.., e.g committing every 10
> second.this is what the auto commit does anyway and  I could live with
> duplicate data.
> What do u think?
>
> I would then also seem to need a monitoring daemon to check the lag to
> restart the consumer during machine crashes..
>
>
> On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Thanks,Guozhang,
> > So if I switch to SimpleConsumer, will these problems be taken care of
> > already? I would assume that I will need to manage all the offset by
> > myself, including the error recovery logic, right?
> > Chen
> >
> >
> > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hello Chen,
> >>
> >> 1. Manually commit offsets does have the risk of duplicates, consider
> the
> >> following pattern:
> >>
> >> message = consumer.next();
> >> process(message);
> >> consumer.commit();
> >>
> >> the rebalance can happen between line 2 and 3, where the message has
> been
> >> processed but offset not being committed, if another consumer picks up
> >> this
> >> partition after the rebalance, it may re-consume this message again.
> With
> >> auto.commit turned on, offsets will always be committed before the
> >> consumers release ownership of partitions during rebalances.
> >>
> >> In the 0.9 consumer design, we have fixed this issue by introducing the
> >> onPartitionDeassigned callback, you can take a look at its current API
> >> here:
> >>
> >>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >>
> >> 2. Commit offsets too often does have an overhead since it is going to
> >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue
> by
> >> moving the offset management from ZK to kafka servers. This is already
> >> checked in trunk, and will be included in 0.8.2 release.
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <ch...@gmail.com>
> >> wrote:
> >>
> >> > Guozhang,
> >> > Just to make it clear:
> >> > If I have 10 threads with the same consumer group id, read the topic
> T.
> >> The
> >> > auto commit is turned off, and commitOffset is called only when the
> >> message
> >> > is processed successfully.
> >> > If thread 1 dies when processing message from partition P1, and the
> last
> >> > offset is Offset1.   Then kafka will ensure that one of the other
> >> running 9
> >> > threads will automatically pick up the message on partition P1 from
> >> Offset1
> >> > ? will the thread have the risk of reading the same message more than
> >> once?
> >> >
> >> > Also I would assume commit offset for each message is a bit heavy.
> What
> >> you
> >> > guys usually do for error handling during reading kafka?
> >> > Thanks much!
> >> > Chen
> >> >
> >> >
> >> >
> >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Yes, in that case you can turn of auto commit and call commitOffsets
> >> > > manually after processing is finished. commitOffsets() will only
> write
> >> > the
> >> > > offset of the partitions that the consumer is currently fetching, so
> >> > there
> >> > > is no need to coordinate this operation.
> >> > >
> >> > >
> >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> chen.apache.solr@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > > > But with the auto commit turned on, I am risking off losing the
> >> failed
> >> > > > message, right? should I turn off the auto commit, and only commit
> >> the
> >> > > > offset when the message is processed successfully..But that would
> >> > require
> >> > > > the coordination between threads in order to know what is the
> right
> >> > > timing
> >> > > > to commit offset..
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wangguoz@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > > > Hello Chen,
> >> > > > >
> >> > > > > With high-level consumer, the partition re-assignment is
> automatic
> >> > upon
> >> > > > > consumer failures.
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> >> > chen.apache.solr@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Folks,
> >> > > > > >  I have a process started at specific time and read from a
> >> specific
> >> > > > > topic.
> >> > > > > > I am currently using the High Level API(consumer group) to
> read
> >> > from
> >> > > > > > kafka(and will stop once there is nothing in the topic by
> >> > specifying
> >> > > a
> >> > > > > > timeout). i am most concerned about error recovery in multiple
> >> > thread
> >> > > > > > context. If one thread dies, will other running bolt threads
> >> picks
> >> > up
> >> > > > the
> >> > > > > > failed message? Or I have to start another thread in order to
> >> pick
> >> > up
> >> > > > the
> >> > > > > > failed message? What would be  a good practice to ensure the
> >> > message
> >> > > > can
> >> > > > > be
> >> > > > > > processed at least once?
> >> > > > > >
> >> > > > > > Note that all threads are using the same group id.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Chen
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Maybe i could batch the messages before commit.., e.g committing every 10
second.this is what the auto commit does anyway and  I could live with
duplicate data.
What do u think?

I would then also seem to need a monitoring daemon to check the lag to
restart the consumer during machine crashes..


On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <ch...@gmail.com>
wrote:

> Thanks,Guozhang,
> So if I switch to SimpleConsumer, will these problems be taken care of
> already? I would assume that I will need to manage all the offset by
> myself, including the error recovery logic, right?
> Chen
>
>
> On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hello Chen,
>>
>> 1. Manually commit offsets does have the risk of duplicates, consider the
>> following pattern:
>>
>> message = consumer.next();
>> process(message);
>> consumer.commit();
>>
>> the rebalance can happen between line 2 and 3, where the message has been
>> processed but offset not being committed, if another consumer picks up
>> this
>> partition after the rebalance, it may re-consume this message again. With
>> auto.commit turned on, offsets will always be committed before the
>> consumers release ownership of partitions during rebalances.
>>
>> In the 0.9 consumer design, we have fixed this issue by introducing the
>> onPartitionDeassigned callback, you can take a look at its current API
>> here:
>>
>>
>> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
>>
>> 2. Commit offsets too often does have an overhead since it is going to
>> Zookeeper, and ZK is not write-scalable. We are also fixing that issue by
>> moving the offset management from ZK to kafka servers. This is already
>> checked in trunk, and will be included in 0.8.2 release.
>>
>> Guozhang
>>
>>
>> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <ch...@gmail.com>
>> wrote:
>>
>> > Guozhang,
>> > Just to make it clear:
>> > If I have 10 threads with the same consumer group id, read the topic T.
>> The
>> > auto commit is turned off, and commitOffset is called only when the
>> message
>> > is processed successfully.
>> > If thread 1 dies when processing message from partition P1, and the last
>> > offset is Offset1.   Then kafka will ensure that one of the other
>> running 9
>> > threads will automatically pick up the message on partition P1 from
>> Offset1
>> > ? will the thread have the risk of reading the same message more than
>> once?
>> >
>> > Also I would assume commit offset for each message is a bit heavy. What
>> you
>> > guys usually do for error handling during reading kafka?
>> > Thanks much!
>> > Chen
>> >
>> >
>> >
>> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Yes, in that case you can turn of auto commit and call commitOffsets
>> > > manually after processing is finished. commitOffsets() will only write
>> > the
>> > > offset of the partitions that the consumer is currently fetching, so
>> > there
>> > > is no need to coordinate this operation.
>> > >
>> > >
>> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <chen.apache.solr@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > But with the auto commit turned on, I am risking off losing the
>> failed
>> > > > message, right? should I turn off the auto commit, and only commit
>> the
>> > > > offset when the message is processed successfully..But that would
>> > require
>> > > > the coordination between threads in order to know what is the right
>> > > timing
>> > > > to commit offset..
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hello Chen,
>> > > > >
>> > > > > With high-level consumer, the partition re-assignment is automatic
>> > upon
>> > > > > consumer failures.
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
>> > chen.apache.solr@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Folks,
>> > > > > >  I have a process started at specific time and read from a
>> specific
>> > > > > topic.
>> > > > > > I am currently using the High Level API(consumer group) to read
>> > from
>> > > > > > kafka(and will stop once there is nothing in the topic by
>> > specifying
>> > > a
>> > > > > > timeout). i am most concerned about error recovery in multiple
>> > thread
>> > > > > > context. If one thread dies, will other running bolt threads
>> picks
>> > up
>> > > > the
>> > > > > > failed message? Or I have to start another thread in order to
>> pick
>> > up
>> > > > the
>> > > > > > failed message? What would be  a good practice to ensure the
>> > message
>> > > > can
>> > > > > be
>> > > > > > processed at least once?
>> > > > > >
>> > > > > > Note that all threads are using the same group id.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Chen
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Thanks,Guozhang,
So if I switch to SimpleConsumer, will these problems be taken care of
already? I would assume that I will need to manage all the offset by
myself, including the error recovery logic, right?
Chen


On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Chen,
>
> 1. Manually commit offsets does have the risk of duplicates, consider the
> following pattern:
>
> message = consumer.next();
> process(message);
> consumer.commit();
>
> the rebalance can happen between line 2 and 3, where the message has been
> processed but offset not being committed, if another consumer picks up this
> partition after the rebalance, it may re-consume this message again. With
> auto.commit turned on, offsets will always be committed before the
> consumers release ownership of partitions during rebalances.
>
> In the 0.9 consumer design, we have fixed this issue by introducing the
> onPartitionDeassigned callback, you can take a look at its current API
> here:
>
>
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> 2. Commit offsets too often does have an overhead since it is going to
> Zookeeper, and ZK is not write-scalable. We are also fixing that issue by
> moving the offset management from ZK to kafka servers. This is already
> checked in trunk, and will be included in 0.8.2 release.
>
> Guozhang
>
>
> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Guozhang,
> > Just to make it clear:
> > If I have 10 threads with the same consumer group id, read the topic T.
> The
> > auto commit is turned off, and commitOffset is called only when the
> message
> > is processed successfully.
> > If thread 1 dies when processing message from partition P1, and the last
> > offset is Offset1.   Then kafka will ensure that one of the other
> running 9
> > threads will automatically pick up the message on partition P1 from
> Offset1
> > ? will the thread have the risk of reading the same message more than
> once?
> >
> > Also I would assume commit offset for each message is a bit heavy. What
> you
> > guys usually do for error handling during reading kafka?
> > Thanks much!
> > Chen
> >
> >
> >
> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Yes, in that case you can turn of auto commit and call commitOffsets
> > > manually after processing is finished. commitOffsets() will only write
> > the
> > > offset of the partitions that the consumer is currently fetching, so
> > there
> > > is no need to coordinate this operation.
> > >
> > >
> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <ch...@gmail.com>
> > > wrote:
> > >
> > > > But with the auto commit turned on, I am risking off losing the
> failed
> > > > message, right? should I turn off the auto commit, and only commit
> the
> > > > offset when the message is processed successfully..But that would
> > require
> > > > the coordination between threads in order to know what is the right
> > > timing
> > > > to commit offset..
> > > >
> > > >
> > > >
> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Chen,
> > > > >
> > > > > With high-level consumer, the partition re-assignment is automatic
> > upon
> > > > > consumer failures.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > chen.apache.solr@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Folks,
> > > > > >  I have a process started at specific time and read from a
> specific
> > > > > topic.
> > > > > > I am currently using the High Level API(consumer group) to read
> > from
> > > > > > kafka(and will stop once there is nothing in the topic by
> > specifying
> > > a
> > > > > > timeout). i am most concerned about error recovery in multiple
> > thread
> > > > > > context. If one thread dies, will other running bolt threads
> picks
> > up
> > > > the
> > > > > > failed message? Or I have to start another thread in order to
> pick
> > up
> > > > the
> > > > > > failed message? What would be  a good practice to ensure the
> > message
> > > > can
> > > > > be
> > > > > > processed at least once?
> > > > > >
> > > > > > Note that all threads are using the same group id.
> > > > > >
> > > > > > Thanks,
> > > > > > Chen
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Chen,

1. Manually commit offsets does have the risk of duplicates, consider the
following pattern:

message = consumer.next();
process(message);
consumer.commit();

the rebalance can happen between line 2 and 3, where the message has been
processed but offset not being committed, if another consumer picks up this
partition after the rebalance, it may re-consume this message again. With
auto.commit turned on, offsets will always be committed before the
consumers release ownership of partitions during rebalances.

In the 0.9 consumer design, we have fixed this issue by introducing the
onPartitionDeassigned callback, you can take a look at its current API here:

http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html

2. Commit offsets too often does have an overhead since it is going to
Zookeeper, and ZK is not write-scalable. We are also fixing that issue by
moving the offset management from ZK to kafka servers. This is already
checked in trunk, and will be included in 0.8.2 release.

Guozhang


On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <ch...@gmail.com>
wrote:

> Guozhang,
> Just to make it clear:
> If I have 10 threads with the same consumer group id, read the topic T. The
> auto commit is turned off, and commitOffset is called only when the message
> is processed successfully.
> If thread 1 dies when processing message from partition P1, and the last
> offset is Offset1.   Then kafka will ensure that one of the other running 9
> threads will automatically pick up the message on partition P1 from Offset1
> ? will the thread have the risk of reading the same message more than once?
>
> Also I would assume commit offset for each message is a bit heavy. What you
> guys usually do for error handling during reading kafka?
> Thanks much!
> Chen
>
>
>
> On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Yes, in that case you can turn of auto commit and call commitOffsets
> > manually after processing is finished. commitOffsets() will only write
> the
> > offset of the partitions that the consumer is currently fetching, so
> there
> > is no need to coordinate this operation.
> >
> >
> > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > But with the auto commit turned on, I am risking off losing the failed
> > > message, right? should I turn off the auto commit, and only commit the
> > > offset when the message is processed successfully..But that would
> require
> > > the coordination between threads in order to know what is the right
> > timing
> > > to commit offset..
> > >
> > >
> > >
> > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Chen,
> > > >
> > > > With high-level consumer, the partition re-assignment is automatic
> upon
> > > > consumer failures.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> chen.apache.solr@gmail.com>
> > > > wrote:
> > > >
> > > > > Folks,
> > > > >  I have a process started at specific time and read from a specific
> > > > topic.
> > > > > I am currently using the High Level API(consumer group) to read
> from
> > > > > kafka(and will stop once there is nothing in the topic by
> specifying
> > a
> > > > > timeout). i am most concerned about error recovery in multiple
> thread
> > > > > context. If one thread dies, will other running bolt threads picks
> up
> > > the
> > > > > failed message? Or I have to start another thread in order to pick
> up
> > > the
> > > > > failed message? What would be  a good practice to ensure the
> message
> > > can
> > > > be
> > > > > processed at least once?
> > > > >
> > > > > Note that all threads are using the same group id.
> > > > >
> > > > > Thanks,
> > > > > Chen
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Just did some testing.It seems that the rebalance will occur upon
*zookeeper.session.timeout.ms
<http://zookeeper.session.timeout.ms>. *
*So yes, if one thread died, the left over messages will be picked up by
other threads.*


On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <ch...@gmail.com>
wrote:

> Guozhang,
> Just to make it clear:
> If I have 10 threads with the same consumer group id, read the topic T.
> The auto commit is turned off, and commitOffset is called only when the
> message is processed successfully.
> If thread 1 dies when processing message from partition P1, and the last
> offset is Offset1.   Then kafka will ensure that one of the other running 9
> threads will automatically pick up the message on partition P1 from Offset1
> ? will the thread have the risk of reading the same message more than once?
>
> Also I would assume commit offset for each message is a bit heavy. What
> you guys usually do for error handling during reading kafka?
> Thanks much!
> Chen
>
>
>
> On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Yes, in that case you can turn of auto commit and call commitOffsets
>> manually after processing is finished. commitOffsets() will only write the
>> offset of the partitions that the consumer is currently fetching, so there
>> is no need to coordinate this operation.
>>
>>
>> On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <ch...@gmail.com>
>> wrote:
>>
>> > But with the auto commit turned on, I am risking off losing the failed
>> > message, right? should I turn off the auto commit, and only commit the
>> > offset when the message is processed successfully..But that would
>> require
>> > the coordination between threads in order to know what is the right
>> timing
>> > to commit offset..
>> >
>> >
>> >
>> > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Hello Chen,
>> > >
>> > > With high-level consumer, the partition re-assignment is automatic
>> upon
>> > > consumer failures.
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <chen.apache.solr@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Folks,
>> > > >  I have a process started at specific time and read from a specific
>> > > topic.
>> > > > I am currently using the High Level API(consumer group) to read from
>> > > > kafka(and will stop once there is nothing in the topic by
>> specifying a
>> > > > timeout). i am most concerned about error recovery in multiple
>> thread
>> > > > context. If one thread dies, will other running bolt threads picks
>> up
>> > the
>> > > > failed message? Or I have to start another thread in order to pick
>> up
>> > the
>> > > > failed message? What would be  a good practice to ensure the message
>> > can
>> > > be
>> > > > processed at least once?
>> > > >
>> > > > Note that all threads are using the same group id.
>> > > >
>> > > > Thanks,
>> > > > Chen
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
Guozhang,
Just to make it clear:
If I have 10 threads with the same consumer group id, read the topic T. The
auto commit is turned off, and commitOffset is called only when the message
is processed successfully.
If thread 1 dies when processing message from partition P1, and the last
offset is Offset1.   Then kafka will ensure that one of the other running 9
threads will automatically pick up the message on partition P1 from Offset1
? will the thread have the risk of reading the same message more than once?

Also I would assume commit offset for each message is a bit heavy. What you
guys usually do for error handling during reading kafka?
Thanks much!
Chen



On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Yes, in that case you can turn of auto commit and call commitOffsets
> manually after processing is finished. commitOffsets() will only write the
> offset of the partitions that the consumer is currently fetching, so there
> is no need to coordinate this operation.
>
>
> On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > But with the auto commit turned on, I am risking off losing the failed
> > message, right? should I turn off the auto commit, and only commit the
> > offset when the message is processed successfully..But that would require
> > the coordination between threads in order to know what is the right
> timing
> > to commit offset..
> >
> >
> >
> > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Chen,
> > >
> > > With high-level consumer, the partition re-assignment is automatic upon
> > > consumer failures.
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <ch...@gmail.com>
> > > wrote:
> > >
> > > > Folks,
> > > >  I have a process started at specific time and read from a specific
> > > topic.
> > > > I am currently using the High Level API(consumer group) to read from
> > > > kafka(and will stop once there is nothing in the topic by specifying
> a
> > > > timeout). i am most concerned about error recovery in multiple thread
> > > > context. If one thread dies, will other running bolt threads picks up
> > the
> > > > failed message? Or I have to start another thread in order to pick up
> > the
> > > > failed message? What would be  a good practice to ensure the message
> > can
> > > be
> > > > processed at least once?
> > > >
> > > > Note that all threads are using the same group id.
> > > >
> > > > Thanks,
> > > > Chen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Yes, in that case you can turn of auto commit and call commitOffsets
manually after processing is finished. commitOffsets() will only write the
offset of the partitions that the consumer is currently fetching, so there
is no need to coordinate this operation.


On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <ch...@gmail.com>
wrote:

> But with the auto commit turned on, I am risking off losing the failed
> message, right? should I turn off the auto commit, and only commit the
> offset when the message is processed successfully..But that would require
> the coordination between threads in order to know what is the right timing
> to commit offset..
>
>
>
> On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Chen,
> >
> > With high-level consumer, the partition re-assignment is automatic upon
> > consumer failures.
> >
> > Guozhang
> >
> >
> > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > Folks,
> > >  I have a process started at specific time and read from a specific
> > topic.
> > > I am currently using the High Level API(consumer group) to read from
> > > kafka(and will stop once there is nothing in the topic by specifying a
> > > timeout). i am most concerned about error recovery in multiple thread
> > > context. If one thread dies, will other running bolt threads picks up
> the
> > > failed message? Or I have to start another thread in order to pick up
> the
> > > failed message? What would be  a good practice to ensure the message
> can
> > be
> > > processed at least once?
> > >
> > > Note that all threads are using the same group id.
> > >
> > > Thanks,
> > > Chen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Chen Wang <ch...@gmail.com>.
But with the auto commit turned on, I am risking off losing the failed
message, right? should I turn off the auto commit, and only commit the
offset when the message is processed successfully..But that would require
the coordination between threads in order to know what is the right timing
to commit offset..



On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Chen,
>
> With high-level consumer, the partition re-assignment is automatic upon
> consumer failures.
>
> Guozhang
>
>
> On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Folks,
> >  I have a process started at specific time and read from a specific
> topic.
> > I am currently using the High Level API(consumer group) to read from
> > kafka(and will stop once there is nothing in the topic by specifying a
> > timeout). i am most concerned about error recovery in multiple thread
> > context. If one thread dies, will other running bolt threads picks up the
> > failed message? Or I have to start another thread in order to pick up the
> > failed message? What would be  a good practice to ensure the message can
> be
> > processed at least once?
> >
> > Note that all threads are using the same group id.
> >
> > Thanks,
> > Chen
> >
>
>
>
> --
> -- Guozhang
>

Re: error recovery in multiple thread reading from Kafka with HighLevel api

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Chen,

With high-level consumer, the partition re-assignment is automatic upon
consumer failures.

Guozhang


On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <ch...@gmail.com>
wrote:

> Folks,
>  I have a process started at specific time and read from a specific topic.
> I am currently using the High Level API(consumer group) to read from
> kafka(and will stop once there is nothing in the topic by specifying a
> timeout). i am most concerned about error recovery in multiple thread
> context. If one thread dies, will other running bolt threads picks up the
> failed message? Or I have to start another thread in order to pick up the
> failed message? What would be  a good practice to ensure the message can be
> processed at least once?
>
> Note that all threads are using the same group id.
>
> Thanks,
> Chen
>



-- 
-- Guozhang