You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sergi Vladykin <se...@gmail.com> on 2019/10/30 19:29:58 UTC

Exactly once transactions

Hi!

I investigate possibilities of "exactly once" Kafka transactions for
consume-transform-produce pattern. As far as I understand, the logic must
be the following (in pseudo-code):

var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
cons.subscribe(TOPIC_A);
for (;;) {
    var recs = cons.poll();
    for (var part : recs.partitions()) {
         var partRecs = recs.records(part);
         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX + part);
         prod.beginTransaction();
         sendAllRecs(prod, TOPIC_B, partRecs);
         prod.sendOffsetsToTransaction(singletonMap(part,
lastRecOffset(partRecs) + 1),

MY_CONSUMER_GROUP_ID);
         prod.commitTransaction();
    }
}

Is this right approach?

Because it looks to me there is a possible race here and the same record
from topic A can be committed to topic B more than once: if rebalancing
happens after our thread polled a record and before creating a producer,
then another thread will read and commit the same record, after that our
thread will wake up, create a producer (and fence the other one) and
successfully commit the same record second time.

Can anyone explain please how to do "exactly once" in Kafka right?
Examples would be very helpful.

Sergi

Re: Exactly once transactions

Posted by Sergi Vladykin <se...@gmail.com>.
Thanks a lot for all the suggestions!

Streams API is not really what I need, but I looked into Streams sources
and found that it initializes transactional Producers in
ConsumerRebalanceListener.onPartitionsAssigned which is called in
KafkaConsumer.poll before fetching records.

Looks like this approach solves the race I was talking about.

Sergi

чт, 31 окт. 2019 г. в 12:10, Matthias J. Sax <ma...@confluent.io>:

> I would recommend to read the Kafka Streams KIP about EOS:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
>
> Fencing is the most critical part in the implementation. Kafka Streams
> basically uses a dedicated `transactional.id` per input topic-partition.
> Hence, if a rebalance happens and a partition is re-assigned, it's
> ensure that only one "instance" of a consumer-producer pair can commit
> the transactions successfully, and the "new producer" would use the same
> associated `transactional.id` as the "original producer".
>
> There is actually a KIP in progress that will make using transactions
> simpler, as it basically improves fencing:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>
>
> But I agree with Alex, if you can, it's recommended to use Streams API
> that solves those problems for you. There is no need to re-implement it
> from scratch.
>
>
>
> Hope this helps.
>
>
> -Matthias
>
> On 10/30/19 8:30 PM, Alex Brekken wrote:
> > Sergi, have you looked at using the Kafka Streams API?  If you're just
> > consuming from a topic, transforming the data, then writing it out to a
> > sink topic, you can probably do that relatively easily using the Streams
> > DSL. No need to manually subscribe, poll, or commit offsets. Then if you
> > want to get exactly once guarantees, you just set the
> processing.guarantee
> > property to "exactly_once" and you're all set.   However, since it sounds
> > like your application isn't stateful then I think your only concern is
> with
> > producing a duplicate message(s) to the sink topic right?  (you don't
> have
> > any internal state that could get messed up in the event of crashes,
> > network failures, etc)  Do you have control over who/what consumes the
> sink
> > topic?  If so, can you make that consumer tolerant of duplicate messages?
> > Exactly once works well in my experience, but there is overhead involved
> so
> > only use it if you need it.  :)
> >
> > Alex
> >
> > On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <my...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> It may be not for your case, but I have implemented an example about
> kafka
> >> transaction:
> >>
> >> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
> >>
> >> , in this example, offsets are saved to external db.
> >>
> >> - Kidong
> >>
> >>
> >>
> >>
> >>
> >> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <se...@gmail.com>님이
> >> 작성:
> >>
> >>> Ok, so what is the advice? Not to use Kafka transactions ever because
> >> they
> >>> are unusable in real life?
> >>> Can you please provide a recipe how to make it work in the simple
> >> scenario:
> >>> no databases, just two topics, no admin actions.
> >>>
> >>> Sergi
> >>>
> >>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jo...@gmail.com>:
> >>>
> >>>> Please note that for exactlyOnce it is not sufficient to set simply an
> >>>> option. The producer and consumer must individually make sure that
> they
> >>>> only process the message once. For instance, the consumer can crash
> and
> >>> it
> >>>> may then resend already submitted messages or the producer might crash
> >>> and
> >>>> might write the same message twice to a database etc.
> >>>> Or due to a backup and restore or through a manual admin action all
> >> these
> >>>> things might happen.
> >>>> Those are not “edge” scenarios. In operations they can happen quiet
> >>> often,
> >>>> especially in a Containerized infrastructure.
> >>>> This you have to consider for all messaging solutions (not only Kafka)
> >> in
> >>>> your technical design.
> >>>>
> >>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
> >>> sergi.vladykin@gmail.com
> >>>>> :
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> I investigate possibilities of "exactly once" Kafka transactions for
> >>>>> consume-transform-produce pattern. As far as I understand, the logic
> >>> must
> >>>>> be the following (in pseudo-code):
> >>>>>
> >>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> >>>>> cons.subscribe(TOPIC_A);
> >>>>> for (;;) {
> >>>>>    var recs = cons.poll();
> >>>>>    for (var part : recs.partitions()) {
> >>>>>         var partRecs = recs.records(part);
> >>>>>         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
> >> part);
> >>>>>         prod.beginTransaction();
> >>>>>         sendAllRecs(prod, TOPIC_B, partRecs);
> >>>>>         prod.sendOffsetsToTransaction(singletonMap(part,
> >>>>> lastRecOffset(partRecs) + 1),
> >>>>>
> >>>>> MY_CONSUMER_GROUP_ID);
> >>>>>         prod.commitTransaction();
> >>>>>    }
> >>>>> }
> >>>>>
> >>>>> Is this right approach?
> >>>>>
> >>>>> Because it looks to me there is a possible race here and the same
> >>> record
> >>>>> from topic A can be committed to topic B more than once: if
> >> rebalancing
> >>>>> happens after our thread polled a record and before creating a
> >>> producer,
> >>>>> then another thread will read and commit the same record, after that
> >>> our
> >>>>> thread will wake up, create a producer (and fence the other one) and
> >>>>> successfully commit the same record second time.
> >>>>>
> >>>>> Can anyone explain please how to do "exactly once" in Kafka right?
> >>>>> Examples would be very helpful.
> >>>>>
> >>>>> Sergi
> >>>>
> >>>
> >>
> >
>
>

Re: Exactly once transactions

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I would recommend to read the Kafka Streams KIP about EOS:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

Fencing is the most critical part in the implementation. Kafka Streams
basically uses a dedicated `transactional.id` per input topic-partition.
Hence, if a rebalance happens and a partition is re-assigned, it's
ensure that only one "instance" of a consumer-producer pair can commit
the transactions successfully, and the "new producer" would use the same
associated `transactional.id` as the "original producer".

There is actually a KIP in progress that will make using transactions
simpler, as it basically improves fencing:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics


But I agree with Alex, if you can, it's recommended to use Streams API
that solves those problems for you. There is no need to re-implement it
from scratch.



Hope this helps.


-Matthias

On 10/30/19 8:30 PM, Alex Brekken wrote:
> Sergi, have you looked at using the Kafka Streams API?  If you're just
> consuming from a topic, transforming the data, then writing it out to a
> sink topic, you can probably do that relatively easily using the Streams
> DSL. No need to manually subscribe, poll, or commit offsets. Then if you
> want to get exactly once guarantees, you just set the processing.guarantee
> property to "exactly_once" and you're all set.   However, since it sounds
> like your application isn't stateful then I think your only concern is with
> producing a duplicate message(s) to the sink topic right?  (you don't have
> any internal state that could get messed up in the event of crashes,
> network failures, etc)  Do you have control over who/what consumes the sink
> topic?  If so, can you make that consumer tolerant of duplicate messages?
> Exactly once works well in my experience, but there is overhead involved so
> only use it if you need it.  :)
> 
> Alex
> 
> On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <my...@gmail.com> wrote:
> 
>> Hi,
>>
>> It may be not for your case, but I have implemented an example about kafka
>> transaction:
>>
>> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
>>
>> , in this example, offsets are saved to external db.
>>
>> - Kidong
>>
>>
>>
>>
>>
>> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <se...@gmail.com>님이
>> 작성:
>>
>>> Ok, so what is the advice? Not to use Kafka transactions ever because
>> they
>>> are unusable in real life?
>>> Can you please provide a recipe how to make it work in the simple
>> scenario:
>>> no databases, just two topics, no admin actions.
>>>
>>> Sergi
>>>
>>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jo...@gmail.com>:
>>>
>>>> Please note that for exactlyOnce it is not sufficient to set simply an
>>>> option. The producer and consumer must individually make sure that they
>>>> only process the message once. For instance, the consumer can crash and
>>> it
>>>> may then resend already submitted messages or the producer might crash
>>> and
>>>> might write the same message twice to a database etc.
>>>> Or due to a backup and restore or through a manual admin action all
>> these
>>>> things might happen.
>>>> Those are not “edge” scenarios. In operations they can happen quiet
>>> often,
>>>> especially in a Containerized infrastructure.
>>>> This you have to consider for all messaging solutions (not only Kafka)
>> in
>>>> your technical design.
>>>>
>>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
>>> sergi.vladykin@gmail.com
>>>>> :
>>>>>
>>>>> Hi!
>>>>>
>>>>> I investigate possibilities of "exactly once" Kafka transactions for
>>>>> consume-transform-produce pattern. As far as I understand, the logic
>>> must
>>>>> be the following (in pseudo-code):
>>>>>
>>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
>>>>> cons.subscribe(TOPIC_A);
>>>>> for (;;) {
>>>>>    var recs = cons.poll();
>>>>>    for (var part : recs.partitions()) {
>>>>>         var partRecs = recs.records(part);
>>>>>         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
>> part);
>>>>>         prod.beginTransaction();
>>>>>         sendAllRecs(prod, TOPIC_B, partRecs);
>>>>>         prod.sendOffsetsToTransaction(singletonMap(part,
>>>>> lastRecOffset(partRecs) + 1),
>>>>>
>>>>> MY_CONSUMER_GROUP_ID);
>>>>>         prod.commitTransaction();
>>>>>    }
>>>>> }
>>>>>
>>>>> Is this right approach?
>>>>>
>>>>> Because it looks to me there is a possible race here and the same
>>> record
>>>>> from topic A can be committed to topic B more than once: if
>> rebalancing
>>>>> happens after our thread polled a record and before creating a
>>> producer,
>>>>> then another thread will read and commit the same record, after that
>>> our
>>>>> thread will wake up, create a producer (and fence the other one) and
>>>>> successfully commit the same record second time.
>>>>>
>>>>> Can anyone explain please how to do "exactly once" in Kafka right?
>>>>> Examples would be very helpful.
>>>>>
>>>>> Sergi
>>>>
>>>
>>
> 


Re: Exactly once transactions

Posted by Alex Brekken <br...@gmail.com>.
Sergi, have you looked at using the Kafka Streams API?  If you're just
consuming from a topic, transforming the data, then writing it out to a
sink topic, you can probably do that relatively easily using the Streams
DSL. No need to manually subscribe, poll, or commit offsets. Then if you
want to get exactly once guarantees, you just set the processing.guarantee
property to "exactly_once" and you're all set.   However, since it sounds
like your application isn't stateful then I think your only concern is with
producing a duplicate message(s) to the sink topic right?  (you don't have
any internal state that could get messed up in the event of crashes,
network failures, etc)  Do you have control over who/what consumes the sink
topic?  If so, can you make that consumer tolerant of duplicate messages?
Exactly once works well in my experience, but there is overhead involved so
only use it if you need it.  :)

Alex

On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <my...@gmail.com> wrote:

> Hi,
>
> It may be not for your case, but I have implemented an example about kafka
> transaction:
>
> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
>
> , in this example, offsets are saved to external db.
>
> - Kidong
>
>
>
>
>
> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <se...@gmail.com>님이
> 작성:
>
> > Ok, so what is the advice? Not to use Kafka transactions ever because
> they
> > are unusable in real life?
> > Can you please provide a recipe how to make it work in the simple
> scenario:
> > no databases, just two topics, no admin actions.
> >
> > Sergi
> >
> > ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jo...@gmail.com>:
> >
> > > Please note that for exactlyOnce it is not sufficient to set simply an
> > > option. The producer and consumer must individually make sure that they
> > > only process the message once. For instance, the consumer can crash and
> > it
> > > may then resend already submitted messages or the producer might crash
> > and
> > > might write the same message twice to a database etc.
> > > Or due to a backup and restore or through a manual admin action all
> these
> > > things might happen.
> > > Those are not “edge” scenarios. In operations they can happen quiet
> > often,
> > > especially in a Containerized infrastructure.
> > > This you have to consider for all messaging solutions (not only Kafka)
> in
> > > your technical design.
> > >
> > > > Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
> > sergi.vladykin@gmail.com
> > > >:
> > > >
> > > > Hi!
> > > >
> > > > I investigate possibilities of "exactly once" Kafka transactions for
> > > > consume-transform-produce pattern. As far as I understand, the logic
> > must
> > > > be the following (in pseudo-code):
> > > >
> > > > var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> > > > cons.subscribe(TOPIC_A);
> > > > for (;;) {
> > > >    var recs = cons.poll();
> > > >    for (var part : recs.partitions()) {
> > > >         var partRecs = recs.records(part);
> > > >         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
> part);
> > > >         prod.beginTransaction();
> > > >         sendAllRecs(prod, TOPIC_B, partRecs);
> > > >         prod.sendOffsetsToTransaction(singletonMap(part,
> > > > lastRecOffset(partRecs) + 1),
> > > >
> > > > MY_CONSUMER_GROUP_ID);
> > > >         prod.commitTransaction();
> > > >    }
> > > > }
> > > >
> > > > Is this right approach?
> > > >
> > > > Because it looks to me there is a possible race here and the same
> > record
> > > > from topic A can be committed to topic B more than once: if
> rebalancing
> > > > happens after our thread polled a record and before creating a
> > producer,
> > > > then another thread will read and commit the same record, after that
> > our
> > > > thread will wake up, create a producer (and fence the other one) and
> > > > successfully commit the same record second time.
> > > >
> > > > Can anyone explain please how to do "exactly once" in Kafka right?
> > > > Examples would be very helpful.
> > > >
> > > > Sergi
> > >
> >
>

Re: Exactly once transactions

Posted by Kidong Lee <my...@gmail.com>.
Hi,

It may be not for your case, but I have implemented an example about kafka
transaction:

https://medium.com/@mykidong/kafka-transaction-56f022af1b0c

, in this example, offsets are saved to external db.

- Kidong





2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <se...@gmail.com>님이 작성:

> Ok, so what is the advice? Not to use Kafka transactions ever because they
> are unusable in real life?
> Can you please provide a recipe how to make it work in the simple scenario:
> no databases, just two topics, no admin actions.
>
> Sergi
>
> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jo...@gmail.com>:
>
> > Please note that for exactlyOnce it is not sufficient to set simply an
> > option. The producer and consumer must individually make sure that they
> > only process the message once. For instance, the consumer can crash and
> it
> > may then resend already submitted messages or the producer might crash
> and
> > might write the same message twice to a database etc.
> > Or due to a backup and restore or through a manual admin action all these
> > things might happen.
> > Those are not “edge” scenarios. In operations they can happen quiet
> often,
> > especially in a Containerized infrastructure.
> > This you have to consider for all messaging solutions (not only Kafka) in
> > your technical design.
> >
> > > Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
> sergi.vladykin@gmail.com
> > >:
> > >
> > > Hi!
> > >
> > > I investigate possibilities of "exactly once" Kafka transactions for
> > > consume-transform-produce pattern. As far as I understand, the logic
> must
> > > be the following (in pseudo-code):
> > >
> > > var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> > > cons.subscribe(TOPIC_A);
> > > for (;;) {
> > >    var recs = cons.poll();
> > >    for (var part : recs.partitions()) {
> > >         var partRecs = recs.records(part);
> > >         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX + part);
> > >         prod.beginTransaction();
> > >         sendAllRecs(prod, TOPIC_B, partRecs);
> > >         prod.sendOffsetsToTransaction(singletonMap(part,
> > > lastRecOffset(partRecs) + 1),
> > >
> > > MY_CONSUMER_GROUP_ID);
> > >         prod.commitTransaction();
> > >    }
> > > }
> > >
> > > Is this right approach?
> > >
> > > Because it looks to me there is a possible race here and the same
> record
> > > from topic A can be committed to topic B more than once: if rebalancing
> > > happens after our thread polled a record and before creating a
> producer,
> > > then another thread will read and commit the same record, after that
> our
> > > thread will wake up, create a producer (and fence the other one) and
> > > successfully commit the same record second time.
> > >
> > > Can anyone explain please how to do "exactly once" in Kafka right?
> > > Examples would be very helpful.
> > >
> > > Sergi
> >
>

Re: Exactly once transactions

Posted by Sergi Vladykin <se...@gmail.com>.
Ok, so what is the advice? Not to use Kafka transactions ever because they
are unusable in real life?
Can you please provide a recipe how to make it work in the simple scenario:
no databases, just two topics, no admin actions.

Sergi

ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jo...@gmail.com>:

> Please note that for exactlyOnce it is not sufficient to set simply an
> option. The producer and consumer must individually make sure that they
> only process the message once. For instance, the consumer can crash and it
> may then resend already submitted messages or the producer might crash and
> might write the same message twice to a database etc.
> Or due to a backup and restore or through a manual admin action all these
> things might happen.
> Those are not “edge” scenarios. In operations they can happen quiet often,
> especially in a Containerized infrastructure.
> This you have to consider for all messaging solutions (not only Kafka) in
> your technical design.
>
> > Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <sergi.vladykin@gmail.com
> >:
> >
> > Hi!
> >
> > I investigate possibilities of "exactly once" Kafka transactions for
> > consume-transform-produce pattern. As far as I understand, the logic must
> > be the following (in pseudo-code):
> >
> > var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> > cons.subscribe(TOPIC_A);
> > for (;;) {
> >    var recs = cons.poll();
> >    for (var part : recs.partitions()) {
> >         var partRecs = recs.records(part);
> >         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX + part);
> >         prod.beginTransaction();
> >         sendAllRecs(prod, TOPIC_B, partRecs);
> >         prod.sendOffsetsToTransaction(singletonMap(part,
> > lastRecOffset(partRecs) + 1),
> >
> > MY_CONSUMER_GROUP_ID);
> >         prod.commitTransaction();
> >    }
> > }
> >
> > Is this right approach?
> >
> > Because it looks to me there is a possible race here and the same record
> > from topic A can be committed to topic B more than once: if rebalancing
> > happens after our thread polled a record and before creating a producer,
> > then another thread will read and commit the same record, after that our
> > thread will wake up, create a producer (and fence the other one) and
> > successfully commit the same record second time.
> >
> > Can anyone explain please how to do "exactly once" in Kafka right?
> > Examples would be very helpful.
> >
> > Sergi
>

Re: Exactly once transactions

Posted by Jörn Franke <jo...@gmail.com>.
Please note that for exactlyOnce it is not sufficient to set simply an option. The producer and consumer must individually make sure that they only process the message once. For instance, the consumer can crash and it may then resend already submitted messages or the producer might crash and might write the same message twice to a database etc.
Or due to a backup and restore or through a manual admin action all these things might happen.
Those are not “edge” scenarios. In operations they can happen quiet often, especially in a Containerized infrastructure.
This you have to consider for all messaging solutions (not only Kafka) in your technical design.

> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <se...@gmail.com>:
> 
> Hi!
> 
> I investigate possibilities of "exactly once" Kafka transactions for
> consume-transform-produce pattern. As far as I understand, the logic must
> be the following (in pseudo-code):
> 
> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> cons.subscribe(TOPIC_A);
> for (;;) {
>    var recs = cons.poll();
>    for (var part : recs.partitions()) {
>         var partRecs = recs.records(part);
>         var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX + part);
>         prod.beginTransaction();
>         sendAllRecs(prod, TOPIC_B, partRecs);
>         prod.sendOffsetsToTransaction(singletonMap(part,
> lastRecOffset(partRecs) + 1),
> 
> MY_CONSUMER_GROUP_ID);
>         prod.commitTransaction();
>    }
> }
> 
> Is this right approach?
> 
> Because it looks to me there is a possible race here and the same record
> from topic A can be committed to topic B more than once: if rebalancing
> happens after our thread polled a record and before creating a producer,
> then another thread will read and commit the same record, after that our
> thread will wake up, create a producer (and fence the other one) and
> successfully commit the same record second time.
> 
> Can anyone explain please how to do "exactly once" in Kafka right?
> Examples would be very helpful.
> 
> Sergi