You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Arjun <ar...@socialtwist.com> on 2014/08/22 13:18:51 UTC

doubt regarding High level consumer commiting offset

Hi,

I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 consumers. 
I use the high level consumer. My doubt is , i read the messages from 
the consumer iterator, and after reading the message i process those 
messages, if the processing throws any exception i am not commiting any 
offset.If not then i am commiting the offset.

Example : lets say i have 100 messages in the queue.

   after 90 messages, the 91st message is read, but while processing(my 
code), there is some exception and i havent commited the offset.
   Then in this case, if i say consumer iterator.next, will it give me 
the next message i.e 92nd message or will not.
    And if the 92nd message is serverd, and its processing is done 
smoothly, and if i commit the offset, can i get the 91st message again?


thanks
Arjun Narasimha Kota

Re: doubt regarding High level consumer commiting offset

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Arjun,

Currently there is no easy way to re-consume the message that has already
been returned to you that caused the exception; what you need to do is to
close the current consumer in handling the exception WITHOUT committing the
offset, fix the issue that caused the exception, and then restart the
consumer.

Guozhang


On Fri, Aug 22, 2014 at 7:59 AM, Arjun <ar...@socialtwist.com> wrote:

> The way i am doing this is as below
>
> First i get a map of topic to stream list
>       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> From that i get the Stream list for my topic
>       List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);
>
> I iterate on the list, for each stream i create a new thread and run in
> that thread
>
> In each thread i get the iterator for the stream
>      stream.iterator();
> Iterate on that stream
>                     while (it.hasNext()) {//Blocking call.
>                         MessageAndMetadata<byte[], byte[]> message =
> it.next();
>                         try {
>                             handleMessage(message);
>                             consumer.commitOffsets();
>                         } catch (Throwable e) {
>                             logger.error("Failed to process", e);
>                         }
>                     }
>
> While in 0.7, this used to get stuck when there is a error, we used to fix
> the thing and restart the app, then the processing went on fine. Is this
> not the case now? As explained earlier while testing we found out that the
> consumer is consuming next message and if next message gets processed well,
> then all the previous messages are gone. They are in the queue, but we
> don't even know the message offsets to get them and i just read we cant
> even get them even if we have the offsets.
>
> Please let me know weather my understanding is correct or not.
>
> Thanks
> Arjun Narasimha Kota
>
>
>
>
>
>
> On Friday 22 August 2014 04:48 PM, Arjun wrote:
>
>> Hi,
>>
>> I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 consumers.
>> I use the high level consumer. My doubt is , i read the messages from the
>> consumer iterator, and after reading the message i process those messages,
>> if the processing throws any exception i am not commiting any offset.If not
>> then i am commiting the offset.
>>
>> Example : lets say i have 100 messages in the queue.
>>
>>   after 90 messages, the 91st message is read, but while processing(my
>> code), there is some exception and i havent commited the offset.
>>   Then in this case, if i say consumer iterator.next, will it give me the
>> next message i.e 92nd message or will not.
>>    And if the 92nd message is serverd, and its processing is done
>> smoothly, and if i commit the offset, can i get the 91st message again?
>>
>>
>> thanks
>> Arjun Narasimha Kota
>>
>
>


-- 
-- Guozhang

Re: doubt regarding High level consumer commiting offset

Posted by Arjun <ar...@socialtwist.com>.
The way i am doing this is as below

First i get a map of topic to stream list
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);

 From that i get the Stream list for my topic
       List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);

I iterate on the list, for each stream i create a new thread and run in 
that thread

In each thread i get the iterator for the stream
      stream.iterator();
Iterate on that stream
                     while (it.hasNext()) {//Blocking call.
                         MessageAndMetadata<byte[], byte[]> message = 
it.next();
                         try {
                             handleMessage(message);
                             consumer.commitOffsets();
                         } catch (Throwable e) {
                             logger.error("Failed to process", e);
                         }
                     }

While in 0.7, this used to get stuck when there is a error, we used to 
fix the thing and restart the app, then the processing went on fine. Is 
this not the case now? As explained earlier while testing we found out 
that the consumer is consuming next message and if next message gets 
processed well, then all the previous messages are gone. They are in the 
queue, but we don't even know the message offsets to get them and i just 
read we cant even get them even if we have the offsets.

Please let me know weather my understanding is correct or not.

Thanks
Arjun Narasimha Kota





On Friday 22 August 2014 04:48 PM, Arjun wrote:
> Hi,
>
> I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 
> consumers. I use the high level consumer. My doubt is , i read the 
> messages from the consumer iterator, and after reading the message i 
> process those messages, if the processing throws any exception i am 
> not commiting any offset.If not then i am commiting the offset.
>
> Example : lets say i have 100 messages in the queue.
>
>   after 90 messages, the 91st message is read, but while processing(my 
> code), there is some exception and i havent commited the offset.
>   Then in this case, if i say consumer iterator.next, will it give me 
> the next message i.e 92nd message or will not.
>    And if the 92nd message is serverd, and its processing is done 
> smoothly, and if i commit the offset, can i get the 91st message again?
>
>
> thanks
> Arjun Narasimha Kota