You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Maier, Dr. Andreas" <an...@asideas.de> on 2014/03/07 10:38:15 UTC

Are offsets unique, immutable identifiers for a message in a topic?

Hi,

I have the following problem:
My Kafka consumer is consuming messages, but the processing of the message
might fail. I do not want to
retry until success, but instead want to quickly consume the next message.
However at a later time I might still want to reprocess the failed
messages.
So I though about storing a list of offsets of the messages that have
failed in the first try
for later processing.
But that would only make sense, if the offsets are unique, immutable
identifiers for a message within a topic. Since Kafka deletes messages or
compactifies the log after some time,
I was wondering if this is really the case?
If not, how could I then uniquely identify a message within a topic, so
that a consumer knows from
where to start consuming again?

Thank you,
Andreas Maier

AS ideAS Engineering
Axel-Springer-Straße 65
10888 Berlin
Mobil: +49 (0) 151 ­ 730 26 414
andreas.maier@asideas.de



Axel Springer ideAS Engineering GmbH
Ein Unternehmen der Axel Springer SE
Sitz Berlin, Amtsgericht Charlottenburg, HRB 138466 B
Geschäftsführer: Daniel Keller, Niels Matusch







Re: Are offsets unique, immutable identifiers for a message in a topic?

Posted by Martin Kleppmann <mk...@linkedin.com>.
On 7 Mar 2014, at 14:11, "Maier, Dr. Andreas" <an...@asideas.de> wrote:
>> In your case, it sounds like time-based retention with a fairly long
>> retention period is the way to go. You could potentially store the
>> offsets of messages to retry in a separate Kafka topic.
> 
> I was also thinking about doing that. However, what do I do, if I have
> again some errors when processing the offsets from that Kafka topic?
> Since I cannot delete the offsets of messages from the Kafka topic that
> have been processed successfully, I would have to create another
> Kafka topic to again store the remaining offsets and then maybe another
> one and then another on and so on.

You might be interested to have a look at what Samza does: http://samza.incubator.apache.org/learn/documentation/0.7.0/ -- it's a stream processing framework that builds on Kafka's features. It still processes messages sequentially per partition, so it doesn't do the per-message retry that you describe, but it does use a separate Kafka topic for checkpointing state and recovering from failure. (It doesn't require a cascade of topics.)

> That seems awkward to me.
> Wouldn't it be better to simply have a mutable list of offsets, read from
> that list and if a message was successfully processed,
> remove the offset from the list. By that one could immediately see from
> the length of the list how many messages still needs to be processed.
> Since Kafka topics are append only they don't seem to be a good fit for
> this kind of logic.

Indeed. If you want per-message acknowledgement and redelivery, perhaps something like RabbitMQ or ActiveMQ is a better fit for your use case. Kafka's design is optimised for very high-throughput sequential processing of messages, whereas RabbitMQ is better for "job queue" use cases where you want to retry individual messages out-of-order.

Martin


Re: Are offsets unique, immutable identifiers for a message in a topic?

Posted by "Maier, Dr. Andreas" <an...@asideas.de>.
Am 07.03.14 11:43 schrieb "Martin Kleppmann" unter
<mk...@linkedin.com>:

>Almost right: offsets are unique, immutable identifiers for a message
>within a topic-partition. Each partition has its own sequence of offsets,
>but a (topic, partition, offset) triple uniquely and persistently
>identifies a particular message.
>
>For log retention you have essentially two options: to discard messages
>older than some threshold (which can be a few weeks if you have enough
>disk space, giving you plenty of time to recover from failures), or to
>keep only the newest message for a given key and discard older messages
>with the same key 
>(http://kafka.apache.org/081/documentation.html#compaction). When using
>time-based retention, when a segment of the log expires, offsets within
>that segment or before become unavailable. When using compaction, "holes"
>appear in the sequence of offsets where old messages were discarded.

Ok, thank you Martin. That makes it clear.

>
>In your case, it sounds like time-based retention with a fairly long
>retention period is the way to go. You could potentially store the
>offsets of messages to retry in a separate Kafka topic.

I was also thinking about doing that. However, what do I do, if I have
again some errors when processing the offsets from that Kafka topic?
Since I cannot delete the offsets of messages from the Kafka topic that
have been processed successfully, I would have to create another
Kafka topic to again store the remaining offsets and then maybe another
one and then another on and so on.
That seems awkward to me.
Wouldn't it be better to simply have a mutable list of offsets, read from
that list and if a message was successfully processed,
remove the offset from the list. By that one could immediately see from
the length of the list how many messages still needs to be processed.
Since Kafka topics are append only they don't seem to be a good fit for
this kind of logic.

Andreas

>
>
>On 7 Mar 2014, at 09:38, "Maier, Dr. Andreas" <an...@asideas.de>
>wrote:
>
>> Hi,
>> 
>> I have the following problem:
>> My Kafka consumer is consuming messages, but the processing of the
>>message
>> might fail. I do not want to
>> retry until success, but instead want to quickly consume the next
>>message.
>> However at a later time I might still want to reprocess the failed
>> messages.
>> So I though about storing a list of offsets of the messages that have
>> failed in the first try
>> for later processing.
>> But that would only make sense, if the offsets are unique, immutable
>> identifiers for a message within a topic. Since Kafka deletes messages
>>or
>> compactifies the log after some time,
>> I was wondering if this is really the case?
>> If not, how could I then uniquely identify a message within a topic, so
>> that a consumer knows from
>> where to start consuming again?
>> 
>> Thank you,
>> Andreas Maier
>> 
>> AS ideAS Engineering
>> Axel-Springer-Straße 65
>> 10888 Berlin
>> Mobil: +49 (0) 151 ­ 730 26 414
>> andreas.maier@asideas.de
>> 
>> 
>> 
>> Axel Springer ideAS Engineering GmbH
>> Ein Unternehmen der Axel Springer SE
>> Sitz Berlin, Amtsgericht Charlottenburg, HRB 138466 B
>> Geschäftsführer: Daniel Keller, Niels Matusch
>> 
>> 
>> 
>> 
>> 
>> 
>


Re: Are offsets unique, immutable identifiers for a message in a topic?

Posted by Martin Kleppmann <mk...@linkedin.com>.
Almost right: offsets are unique, immutable identifiers for a message within a topic-partition. Each partition has its own sequence of offsets, but a (topic, partition, offset) triple uniquely and persistently identifies a particular message.

For log retention you have essentially two options: to discard messages older than some threshold (which can be a few weeks if you have enough disk space, giving you plenty of time to recover from failures), or to keep only the newest message for a given key and discard older messages with the same key (http://kafka.apache.org/081/documentation.html#compaction). When using time-based retention, when a segment of the log expires, offsets within that segment or before become unavailable. When using compaction, "holes" appear in the sequence of offsets where old messages were discarded.

In your case, it sounds like time-based retention with a fairly long retention period is the way to go. You could potentially store the offsets of messages to retry in a separate Kafka topic.

Martin


On 7 Mar 2014, at 09:38, "Maier, Dr. Andreas" <an...@asideas.de> wrote:

> Hi,
> 
> I have the following problem:
> My Kafka consumer is consuming messages, but the processing of the message
> might fail. I do not want to
> retry until success, but instead want to quickly consume the next message.
> However at a later time I might still want to reprocess the failed
> messages.
> So I though about storing a list of offsets of the messages that have
> failed in the first try
> for later processing.
> But that would only make sense, if the offsets are unique, immutable
> identifiers for a message within a topic. Since Kafka deletes messages or
> compactifies the log after some time,
> I was wondering if this is really the case?
> If not, how could I then uniquely identify a message within a topic, so
> that a consumer knows from
> where to start consuming again?
> 
> Thank you,
> Andreas Maier
> 
> AS ideAS Engineering
> Axel-Springer-Straße 65
> 10888 Berlin
> Mobil: +49 (0) 151 ­ 730 26 414
> andreas.maier@asideas.de
> 
> 
> 
> Axel Springer ideAS Engineering GmbH
> Ein Unternehmen der Axel Springer SE
> Sitz Berlin, Amtsgericht Charlottenburg, HRB 138466 B
> Geschäftsführer: Daniel Keller, Niels Matusch
> 
> 
> 
> 
> 
>