You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stefan Miklosovic <mi...@gmail.com> on 2015/07/16 13:07:01 UTC

Idea how to ensure exactly once message deliver without external storage

Hi,

In the old consumer, I have got just a simple stream of messages, one
by one and if I detected something was wrong, I would destroy my
consumer immediately without commit so once I restart consumer, I will
get the same messages once again because they will be delivered to me
from the last offset committed (if I understand that correctly).

While this can work, I have at-least-once delivery guarantee and that
is not good in my case. I need to have exactly-once guarantee.

While looking into new consumer, I noticed that there is the
possiblity to kind of "rewind" in a partition.

My new algorithm is something like this:

Partition myPartition;

consumer.subscribe(myTopic);

ConsumerRecords = consumer.poll(0);

for (Record record: ConsumerRecords) {

    processRecord(record);

    processedMessages++;

    if (failure) {
        int offsetOfLastProcessedRecord = record.offset();

        // this will effectively rewind me back so I get messages
which are not processed yet

        consumer.seek(myPartition, offsetOfLastProcessedRecord -
processedMessages);

        // here i commit the position of the lastly processed record
so on the next poll
        // i should get messages which were polled before but stayed
unprocessed because of the
        // error
        consumer.commit(map<parition, offsetOfLastProcessedRecord>,
CommitType.SYNC);
    }
}

Does this approach make sense?

-- 
Stefan Miklosovic

Re: Idea how to ensure exactly once message deliver without external storage

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hey Stefan,

I think if you follow the one-commit-per-message approach, you will be
able to achieve exact once semantic. However this would be very expensive
and also everything has to be synchronous in order to make it work.

In that sense, de-duplication on client side seems a more favorable option
to me. That implies there has to be some way to persist the delivered
offset on client side so we can still do de-duplication in
crash-then-resume use case.

Thanks,

Jiangjie (Becket) Qin

On 7/16/15, 9:25 AM, "Jason Gustafson" <ja...@confluent.io> wrote:

>Hey Stefan,
>
>I only see a commit in the failure case. Were you planning to use
>auto-commits otherwise? You'd probably want to handle all commits directly
>or you'd always be left guessing. But even if you did, I think the main
>problem is that your process could fail before a needed commit is sent to
>the broker. After it resumes, it wouldn't know that a commit had been
>pending and might reprocess some messages.
>
>I think some Kafka devs have been thinking about exactly once semantics,
>but I don't think there's anything solid yet.
>
>-Jason
>
>On Thu, Jul 16, 2015 at 4:07 AM, Stefan Miklosovic <mi...@gmail.com>
>wrote:
>
>> Hi,
>>
>> In the old consumer, I have got just a simple stream of messages, one
>> by one and if I detected something was wrong, I would destroy my
>> consumer immediately without commit so once I restart consumer, I will
>> get the same messages once again because they will be delivered to me
>> from the last offset committed (if I understand that correctly).
>>
>> While this can work, I have at-least-once delivery guarantee and that
>> is not good in my case. I need to have exactly-once guarantee.
>>
>> While looking into new consumer, I noticed that there is the
>> possiblity to kind of "rewind" in a partition.
>>
>> My new algorithm is something like this:
>>
>> Partition myPartition;
>>
>> consumer.subscribe(myTopic);
>>
>> ConsumerRecords = consumer.poll(0);
>>
>> for (Record record: ConsumerRecords) {
>>
>>     processRecord(record);
>>
>>     processedMessages++;
>>
>>     if (failure) {
>>         int offsetOfLastProcessedRecord = record.offset();
>>
>>         // this will effectively rewind me back so I get messages
>> which are not processed yet
>>
>>         consumer.seek(myPartition, offsetOfLastProcessedRecord -
>> processedMessages);
>>
>>         // here i commit the position of the lastly processed record
>> so on the next poll
>>         // i should get messages which were polled before but stayed
>> unprocessed because of the
>>         // error
>>         consumer.commit(map<parition, offsetOfLastProcessedRecord>,
>> CommitType.SYNC);
>>     }
>> }
>>
>> Does this approach make sense?
>>
>> --
>> Stefan Miklosovic
>>


Re: Idea how to ensure exactly once message deliver without external storage

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Stefan,

I only see a commit in the failure case. Were you planning to use
auto-commits otherwise? You'd probably want to handle all commits directly
or you'd always be left guessing. But even if you did, I think the main
problem is that your process could fail before a needed commit is sent to
the broker. After it resumes, it wouldn't know that a commit had been
pending and might reprocess some messages.

I think some Kafka devs have been thinking about exactly once semantics,
but I don't think there's anything solid yet.

-Jason

On Thu, Jul 16, 2015 at 4:07 AM, Stefan Miklosovic <mi...@gmail.com>
wrote:

> Hi,
>
> In the old consumer, I have got just a simple stream of messages, one
> by one and if I detected something was wrong, I would destroy my
> consumer immediately without commit so once I restart consumer, I will
> get the same messages once again because they will be delivered to me
> from the last offset committed (if I understand that correctly).
>
> While this can work, I have at-least-once delivery guarantee and that
> is not good in my case. I need to have exactly-once guarantee.
>
> While looking into new consumer, I noticed that there is the
> possiblity to kind of "rewind" in a partition.
>
> My new algorithm is something like this:
>
> Partition myPartition;
>
> consumer.subscribe(myTopic);
>
> ConsumerRecords = consumer.poll(0);
>
> for (Record record: ConsumerRecords) {
>
>     processRecord(record);
>
>     processedMessages++;
>
>     if (failure) {
>         int offsetOfLastProcessedRecord = record.offset();
>
>         // this will effectively rewind me back so I get messages
> which are not processed yet
>
>         consumer.seek(myPartition, offsetOfLastProcessedRecord -
> processedMessages);
>
>         // here i commit the position of the lastly processed record
> so on the next poll
>         // i should get messages which were polled before but stayed
> unprocessed because of the
>         // error
>         consumer.commit(map<parition, offsetOfLastProcessedRecord>,
> CommitType.SYNC);
>     }
> }
>
> Does this approach make sense?
>
> --
> Stefan Miklosovic
>