You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Cipov <pc...@twilio.com.INVALID> on 2021/03/09 11:06:10 UTC

Re: Abort transaction semantics

Hello

Thanks for advice, I have looked into examples and spotted yet another
place that is not clear to me. It is resetToLastCommittedPositions (in case
of abort). I am wondering what will happen when this method crashes, so it
will not reset consumer to latest committed offsets. Let's say the process
is killed in that instant. In my case this will schedule a new process that
creates a new consumer (assume rebalance will assign him the same
partitions, nobody else touched them). In such cases the offsets will be
shifted or not ?

I am wondering where is the place in the transaction schema where consumer
offsets are committed to kafka.

Thank you

Peter

On Fri, Feb 19, 2021 at 9:23 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Peter,
>
> Note that when you upgrade from 2.4 to later versions in Kafka, your error
> handling could be modified and simplified a bit as well. You can read the
> example code in KIP-691 as a reference:
>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-691*3A*Enhance*Transactional*Producer*Exception*Handling__;JSsrKysr!!NCc8flgU!IcJMrFZbaTy6WKg7C9KUzh2MsRne_VKrStjpn4MNlzSaDi5-5VUV2NIntQ7pDQ$
>
>
> Guozhang
>
> On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov <pc...@twilio.com.invalid>
> wrote:
>
> > This was really helpful.
> >
> > Thank you
> >
> > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Thanks for the question. I think Gary provided an excellent answer.
> > > Additionally, you could check out the code example
> > > <
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$
> > > >
> > > for EOS, which shows you how to reset the state while aborting ongoing
> > > transactions.
> > >
> > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gr...@vmware.com>
> > wrote:
> > >
> > > > You have to perform seeks (using the consumer) to the lowest
> > unprocessed
> > > > offset for each partition returned by the poll, before the next poll.
> > > > ________________________________
> > > > From: Peter Cipov <pc...@twilio.com.INVALID>
> > > > Sent: Thursday, February 18, 2021 1:20 PM
> > > > To: users@kafka.apache.org <us...@kafka.apache.org>
> > > > Subject: Abort transaction semantics
> > > >
> > > > Hello
> > > > I have a question regarding aborting transactions in kafka client
> > 2.4.1.
> > > >
> > > > lets have following code :
> > > >
> > > > ... propper transaction producer consumer creation, consumer
> > autocommit =
> > > > false
> > > >
> > > > producer.transactionInit();
> > > >
> > > > while(true) {
> > > >   records = consumer.poll();
> > > >   logRecordOffsets(records)
> > > >   producer.beginTransaction()
> > > >   try {
> > > >    doMagic()
> > > >   } catch{
> > > >     producer.AbortTransaction();
> > > >     continue;
> > > >   }
> > > >   producer.sendOffsets(..);
> > > >   producer.commitTransaction()
> > > > }
> > > >
> > > > When doMagic crashes for some reason, abort is called and code will
> > start
> > > > from beginning with doing poll.
> > > >
> > > > Our assumption was that the next poll will start from the same
> offsets,
> > > but
> > > > as we saw from logs this is not the case. What we observed that
> offsets
> > > are
> > > > shifted and messages are lost, they will not be retried again.
> > > >
> > > > What is the semantics for abort, we could not figure out from
> > > > documentation.
> > > > What is the recommended approach for retrying ?
> > > >
> > > > Thank you
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Abort transaction semantics

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Peter,

The resetToLastCommittedPositions is for cases where the app can still up
and running, as a convenient way to continue the processing; if the app has
crashed, then upon restarting it would try to reset its position by reading
the committed offsets, which would be corresponding to the last succeeded
transaction and hence would not include the last failed txn right before
the crash.

Or you can think about that in another way: whenever the transaction need
to be aborted, instead of following the example tutorial we provide, one
can also pursue another more-extreme way which is "always system.exit(1)
immediately, and then try to restart the app" (:P) Which would basically
fall to the crash scenario as well and is still correct --- you just pay
more perf hit as crashing and restarting the app.


Guozhang


On Tue, Mar 9, 2021 at 3:06 AM Peter Cipov <pc...@twilio.com.invalid>
wrote:

> Hello
>
> Thanks for advice, I have looked into examples and spotted yet another
> place that is not clear to me. It is resetToLastCommittedPositions (in case
> of abort). I am wondering what will happen when this method crashes, so it
> will not reset consumer to latest committed offsets. Let's say the process
> is killed in that instant. In my case this will schedule a new process that
> creates a new consumer (assume rebalance will assign him the same
> partitions, nobody else touched them). In such cases the offsets will be
> shifted or not ?
>
> I am wondering where is the place in the transaction schema where consumer
> offsets are committed to kafka.
>
> Thank you
>
> Peter
>
> On Fri, Feb 19, 2021 at 9:23 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Peter,
> >
> > Note that when you upgrade from 2.4 to later versions in Kafka, your
> error
> > handling could be modified and simplified a bit as well. You can read the
> > example code in KIP-691 as a reference:
> >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-691*3A*Enhance*Transactional*Producer*Exception*Handling__;JSsrKysr!!NCc8flgU!IcJMrFZbaTy6WKg7C9KUzh2MsRne_VKrStjpn4MNlzSaDi5-5VUV2NIntQ7pDQ$
> >
> >
> > Guozhang
> >
> > On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov <pc...@twilio.com.invalid>
> > wrote:
> >
> > > This was really helpful.
> > >
> > > Thank you
> > >
> > > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the question. I think Gary provided an excellent answer.
> > > > Additionally, you could check out the code example
> > > > <
> > > >
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$
> > > > >
> > > > for EOS, which shows you how to reset the state while aborting
> ongoing
> > > > transactions.
> > > >
> > > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gr...@vmware.com>
> > > wrote:
> > > >
> > > > > You have to perform seeks (using the consumer) to the lowest
> > > unprocessed
> > > > > offset for each partition returned by the poll, before the next
> poll.
> > > > > ________________________________
> > > > > From: Peter Cipov <pc...@twilio.com.INVALID>
> > > > > Sent: Thursday, February 18, 2021 1:20 PM
> > > > > To: users@kafka.apache.org <us...@kafka.apache.org>
> > > > > Subject: Abort transaction semantics
> > > > >
> > > > > Hello
> > > > > I have a question regarding aborting transactions in kafka client
> > > 2.4.1.
> > > > >
> > > > > lets have following code :
> > > > >
> > > > > ... propper transaction producer consumer creation, consumer
> > > autocommit =
> > > > > false
> > > > >
> > > > > producer.transactionInit();
> > > > >
> > > > > while(true) {
> > > > >   records = consumer.poll();
> > > > >   logRecordOffsets(records)
> > > > >   producer.beginTransaction()
> > > > >   try {
> > > > >    doMagic()
> > > > >   } catch{
> > > > >     producer.AbortTransaction();
> > > > >     continue;
> > > > >   }
> > > > >   producer.sendOffsets(..);
> > > > >   producer.commitTransaction()
> > > > > }
> > > > >
> > > > > When doMagic crashes for some reason, abort is called and code will
> > > start
> > > > > from beginning with doing poll.
> > > > >
> > > > > Our assumption was that the next poll will start from the same
> > offsets,
> > > > but
> > > > > as we saw from logs this is not the case. What we observed that
> > offsets
> > > > are
> > > > > shifted and messages are lost, they will not be retried again.
> > > > >
> > > > > What is the semantics for abort, we could not figure out from
> > > > > documentation.
> > > > > What is the recommended approach for retrying ?
> > > > >
> > > > > Thank you
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang