You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Arjun <ar...@socialtwist.com> on 2014/08/22 13:18:51 UTC
doubt regarding High level consumer commiting offset
Hi,
I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 consumers.
I use the high level consumer. My doubt is , i read the messages from
the consumer iterator, and after reading the message i process those
messages, if the processing throws any exception i am not commiting any
offset.If not then i am commiting the offset.
Example : lets say i have 100 messages in the queue.
after 90 messages, the 91st message is read, but while processing(my
code), there is some exception and i havent commited the offset.
Then in this case, if i say consumer iterator.next, will it give me
the next message i.e 92nd message or will not.
And if the 92nd message is serverd, and its processing is done
smoothly, and if i commit the offset, can i get the 91st message again?
thanks
Arjun Narasimha Kota
Re: doubt regarding High level consumer commiting offset
Posted by Guozhang Wang <wa...@gmail.com>.
Hello Arjun,
Currently there is no easy way to re-consume the message that has already
been returned to you that caused the exception; what you need to do is to
close the current consumer in handling the exception WITHOUT committing the
offset, fix the issue that caused the exception, and then restart the
consumer.
Guozhang
On Fri, Aug 22, 2014 at 7:59 AM, Arjun <ar...@socialtwist.com> wrote:
> The way i am doing this is as below
>
> First i get a map of topic to stream list
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> From that i get the Stream list for my topic
> List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);
>
> I iterate on the list, for each stream i create a new thread and run in
> that thread
>
> In each thread i get the iterator for the stream
> stream.iterator();
> Iterate on that stream
> while (it.hasNext()) {//Blocking call.
> MessageAndMetadata<byte[], byte[]> message =
> it.next();
> try {
> handleMessage(message);
> consumer.commitOffsets();
> } catch (Throwable e) {
> logger.error("Failed to process", e);
> }
> }
>
> While in 0.7, this used to get stuck when there is a error, we used to fix
> the thing and restart the app, then the processing went on fine. Is this
> not the case now? As explained earlier while testing we found out that the
> consumer is consuming next message and if next message gets processed well,
> then all the previous messages are gone. They are in the queue, but we
> don't even know the message offsets to get them and i just read we cant
> even get them even if we have the offsets.
>
> Please let me know weather my understanding is correct or not.
>
> Thanks
> Arjun Narasimha Kota
>
>
>
>
>
>
> On Friday 22 August 2014 04:48 PM, Arjun wrote:
>
>> Hi,
>>
>> I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 consumers.
>> I use the high level consumer. My doubt is , i read the messages from the
>> consumer iterator, and after reading the message i process those messages,
>> if the processing throws any exception i am not commiting any offset.If not
>> then i am commiting the offset.
>>
>> Example : lets say i have 100 messages in the queue.
>>
>> after 90 messages, the 91st message is read, but while processing(my
>> code), there is some exception and i havent commited the offset.
>> Then in this case, if i say consumer iterator.next, will it give me the
>> next message i.e 92nd message or will not.
>> And if the 92nd message is serverd, and its processing is done
>> smoothly, and if i commit the offset, can i get the 91st message again?
>>
>>
>> thanks
>> Arjun Narasimha Kota
>>
>
>
--
-- Guozhang
Re: doubt regarding High level consumer commiting offset
Posted by Arjun <ar...@socialtwist.com>.
The way i am doing this is as below
First i get a map of topic to stream list
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
From that i get the Stream list for my topic
List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);
I iterate on the list, for each stream i create a new thread and run in
that thread
In each thread i get the iterator for the stream
stream.iterator();
Iterate on that stream
while (it.hasNext()) {//Blocking call.
MessageAndMetadata<byte[], byte[]> message =
it.next();
try {
handleMessage(message);
consumer.commitOffsets();
} catch (Throwable e) {
logger.error("Failed to process", e);
}
}
While in 0.7, this used to get stuck when there is a error, we used to
fix the thing and restart the app, then the processing went on fine. Is
this not the case now? As explained earlier while testing we found out
that the consumer is consuming next message and if next message gets
processed well, then all the previous messages are gone. They are in the
queue, but we don't even know the message offsets to get them and i just
read we cant even get them even if we have the offsets.
Please let me know weather my understanding is correct or not.
Thanks
Arjun Narasimha Kota
On Friday 22 August 2014 04:48 PM, Arjun wrote:
> Hi,
>
> I have My Kafka setup with 0.8.1 kafka with 3 producers and 3
> consumers. I use the high level consumer. My doubt is , i read the
> messages from the consumer iterator, and after reading the message i
> process those messages, if the processing throws any exception i am
> not commiting any offset.If not then i am commiting the offset.
>
> Example : lets say i have 100 messages in the queue.
>
> after 90 messages, the 91st message is read, but while processing(my
> code), there is some exception and i havent commited the offset.
> Then in this case, if i say consumer iterator.next, will it give me
> the next message i.e 92nd message or will not.
> And if the 92nd message is serverd, and its processing is done
> smoothly, and if i commit the offset, can i get the 91st message again?
>
>
> thanks
> Arjun Narasimha Kota