You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Piotr Nowojski <pi...@gmail.com> on 2017/06/16 13:21:28 UTC

Kafka 0.11 transactions API question

Hi,

I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
both this KIP-98 document and I looked into the code that is on the master
branch. I would like to use it to implement some two phase commit mechanism
on top of the Kafka's transactions, that would allow me to tie multiple
systems (some of them might not be Kafka) in one transaction.

Maybe I'm missing something but the problem is I don't see a way to
implement it using proposed Kafka's transactions API. Even if I have just
two processes writing to Kafka topics, I don't know how can I guarantee
that if one's transaction is committed, the other will also eventually be
committed. This is because if first KafkaProducer successfully commits, but
the second one fails before committing it's data, after restart the second
one's "initTransactions" call will (according to my understanding of the
API) abort previously non completed transactions.

Usually transactional systems expose API like this
<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
Namely there is a known identifier for a transaction and you can pre-commit
it (void prepare(...) method in before mentioned example) and then commit
or you can abort this transaction. Usually pre-commit involves flushing
stuff to some temporary files and commit move those files to the final
directory. In case of machine/process failure, if it was before
"pre-commit", we can just rollback all transactions from all of the
processes. However once every process acknowledge that it completed
"pre-commit", each process should call "commit". If some process fails at
that stage, after restarting this process, I would expect to be able to
restore it's "pre-committed" transaction (having remembered transaction's
id) and re attempt to commit it - which should be guaranteed to eventually
succeed.

In other words, it seems to me like the missing features of this API for me
are:
1. possibility to resume transactions after machine/process crash. At least
I would expect to be able to commit "flushed"/"pre-committed" data for such
transactions.
2. making sure that committing already committed transactions doesn't brake
anything

Or maybe there is some other way to integrate Kafka into such two phase
commit system that I'm missing?

Thanks, Piotrek

Re: Kafka 0.11 transactions API question

Posted by "Matthias J. Sax" <ma...@confluent.io>.
As Michał said. It's not designed for this use case.

Kafka's transaction, are not the same thing as DB transactions and if
you break it down, it allows for atomic (multi-partition) writes, but no
2-phase commit.

Also, a transaction is "owned" by a single thread (ie, producer) and
cannot be "shared" by multiple -- seems that is what you actually like
to have.

Maybe it would be possible to extend the current exactly-once feature,
but atm there are no plans for it. Main use case is stream processing
with Streams API.


-Matthias

On 6/16/17 8:59 AM, Piotr Nowojski wrote:
> But isn't it a low hanging fruit at this moment? Isn't that just an API
> limitation and wouldn't the backend for transactions support it with
> only minor changes to the API (do not fail automatically dangling
> transactions on Producer restart)? Flushing is already there so that
> _should_ handle the pre-commit. Again, maybe I'm missing something and
> for sure I am not familiar with Kafka's internals.
> 
> Piotrek
> 
> 2017-06-16 15:47 GMT+02:00 Michal Borowiecki
> <michal.borowiecki@openbet.com <ma...@openbet.com>>:
> 
>     I don't think KIP-98 is as ambitious as to provide support for
>     distributed transactions (2 phase commit).
> 
>     It would be great if I was wrong though :P
> 
>     Cheers,
> 
>     Michał
> 
> 
>     On 16/06/17 14:21, Piotr Nowojski wrote:
>>     Hi, I'm looking into Kafka's transactions API as proposed in
>>     KIP-98. I've read both this KIP-98 document and I looked into the
>>     code that is on the master branch. I would like to use it to
>>     implement some two phase commit mechanism on top of the Kafka's
>>     transactions, that would allow me to tie multiple systems (some of
>>     them might not be Kafka) in one transaction. Maybe I'm missing
>>     something but the problem is I don't see a way to implement it
>>     using proposed Kafka's transactions API. Even if I have just two
>>     processes writing to Kafka topics, I don't know how can I
>>     guarantee that if one's transaction is committed, the other will
>>     also eventually be committed. This is because if first
>>     KafkaProducer successfully commits, but the second one fails
>>     before committing it's data, after restart the second one's
>>     "initTransactions" call will (according to my understanding of the
>>     API) abort previously non completed transactions. Usually
>>     transactional systems expose API like this
>>     <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>
>>     <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
>>     Namely there is a known identifier for a transaction and you can pre-commit
>>     it (void prepare(...) method in before mentioned example) and then commit
>>     or you can abort this transaction. Usually pre-commit involves flushing
>>     stuff to some temporary files and commit move those files to the final
>>     directory. In case of machine/process failure, if it was before
>>     "pre-commit", we can just rollback all transactions from all of the
>>     processes. However once every process acknowledge that it completed
>>     "pre-commit", each process should call "commit". If some process fails at
>>     that stage, after restarting this process, I would expect to be able to
>>     restore it's "pre-committed" transaction (having remembered transaction's
>>     id) and re attempt to commit it - which should be guaranteed to eventually
>>     succeed.
>>
>>     In other words, it seems to me like the missing features of this API for me
>>     are:
>>     1. possibility to resume transactions after machine/process crash. At least
>>     I would expect to be able to commit "flushed"/"pre-committed" data for such
>>     transactions.
>>     2. making sure that committing already committed transactions doesn't brake
>>     anything
>>
>>     Or maybe there is some other way to integrate Kafka into such two phase
>>     commit system that I'm missing?
>>
>>     Thanks, Piotrek
>>
> 
>     -- 
>     <http://www.openbet.com/> 	Michal Borowiecki
>     Senior Software Engineer L4
>     	T: 	+44 208 742 1600 <tel:+44%2020%208742%201600>
> 
>     	
>     	+44 203 249 8448 <tel:+44%2020%203249%208448>
> 
>     	
>     	 
>     	E: 	michal.borowiecki@openbet.com
>     <ma...@openbet.com>
>     	W: 	www.openbet.com <http://www.openbet.com/>
> 
>     	
>     	OpenBet Ltd
> 
>     	Chiswick Park Building 9
> 
>     	566 Chiswick High Rd
> 
>     	London
> 
>     	W4 5XT
> 
>     	UK
> 
>     	
>     <https://www.openbet.com/email_promo>
> 
>     This message is confidential and intended only for the addressee. If
>     you have received this message in error, please immediately notify
>     the postmaster@openbet.com <ma...@openbet.com> and
>     delete it from your system as well as any copies. The content of
>     e-mails as well as traffic data may be monitored by OpenBet for
>     employment and security purposes. To protect the environment please
>     do not print this e-mail unless necessary. OpenBet Ltd. Registered
>     Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4
>     5XT, United Kingdom. A company registered in England and Wales.
>     Registered no. 3134634. VAT no. GB927523612
> 
> 


Re: Kafka 0.11 transactions API question

Posted by Piotr Nowojski <pi...@gmail.com>.
It could work as you have described, but indeed only for one "local-only"
transaction. But that's not the case for me. It even wouldn't work with two
Kafka producers. If I could get away with just one single Kafka Producer I
wouldn't  probably need distributed system to solve such use case.

Maybe we are making a digression here, but as I have stated earlier, in
most of the other distributed systems, "pre-commit" involves only flushing
the data to temporary files, and actual "commit" phase only moves the
temporary files to target directory. And you need to flush the data
regardless whether you want transactions or not. Moving files in commit
phase also looks the same in two phase commits as it looks in single phase
commits. Thus implementing those "pre-commit" and "commit" phases properly
shouldn't add any additional overhead.

By the way, I don't want to implement fully atomic distributed
transactions, but I just would like to be able to achieve exactly-once
semantic for events processed in my system. Without those two phase
commits, I can not efficiently (without buffering and serializing all the
events in some permanent storage before writing it to target Kafka's topic)
achieve that with two Kafka Producers.

Thanks again for your responses! :)

Piotrek



2017-06-19 16:15 GMT+02:00 Michal Borowiecki <mi...@openbet.com>
:

> I'm not sure if I understood correctly, but if you want to integrate a
> single kafka producer transaction (or any transaction manager that only
> supports local transaction) into a distributed transaction, I think you can
> do so as long as all other involved transaction managers support 2-phase
> commit. In other words, you can include one (and only one) local-only
> transaction into a distributed transaction.
>
> The steps the distributed transaction coordinator would have to take to
> commit the transaction would be:
>
>    1. call prepare on each of the transaction participants that support
>    2-phase commit
>    2. if any of them fails, abort all transactions, otherwise, proceed
>    3. call commit on the one transaction participant that does not
>    support 2-phase commit (kafka producer in this case)
>    4. if that fails, abort all transactions, otherwise, proceed
>    5. call commit on all the transaction participants that support
>    2-phase commit (since prepare on these succeeded they should not refuse to
>    commit at this point)
>
> So as to your concern about getting "clearance" (I take it as the
> equivalent of the "prepare" call) from the kafka producer, you don't really
> need it IMO, as if commit fails on the kafka producer, you can still abort
> the remaining transactions.
>
> Of course you can't do that if you have more than one transaction that
> doesn't support 2-phase commit in play.
>
> Having said that, the advice these days seems to be to design distributed
> systems for eventual consistency, as using distributed transactions, while
> tempting, often leads to resource exhaustion as transaction managers have
> to go the extra mile to ensure they can commit any transaction that had
> prepare return successfully.
>
> Just my 5c. I may be wrong in any of the above, please point it out if so.
>
> Cheers,
>
> Michał
>
> On 19/06/17 14:57, Piotr Nowojski wrote:
>
> Sorry for responding to my own message, but when I sent an original
> message/question I was not subscribed to this mailing list and now I can
> not respond to Matthias answer directly.
>
> I don't want to share a transaction between multiple Producers
> threads/processes, I just would like to resume an interrupted transaction
> after a machine crash.
>
> Let me try to phrase the problem differently:
>
> From the perspective of a producer that writes to Kafka, we have the
> following situation:
>
> We integrate the producer with transaction in another system. A number or
> records should go together atomically (a transaction). Before committing
> the transaction, we frequently need to ask for a "clearance" status, and if
> we get the "go ahead" we want to commit the transaction.
>
> Unfortunately, as soon as we get that "clearance", we cannot reproduce the
> records any more (the are dropped from the original data stream).
> If something fails between the "go ahead" and the committing, we need to
> retry the transaction, so we need to come up again with all records. As a
> result we have to persist the records before we start the write
> transaction. That is a painful overhead to pay, and a lot of additional
> operational complexity.
>
> The simplest way to support that pattern without extra overhead would we
> could "resume" a transaction:
>
>   - Each transaction as a unique Transaction ID
>   - If a crash of the producer occurs, the transaction is NOT aborted
> automatically.
>   - Instead, the restarted producer process reconnects to the transaction
> and decides to commit it or abort it.
>   - The transaction timeout aborts the transaction after a while if
> inactivity.
>
> Maybe this could be easily supported?
>
> Thanks, Piotrek
>
> 2017-06-16 17:59 GMT+02:00 Piotr Nowojski <pi...@gmail.com>:
>
>> But isn't it a low hanging fruit at this moment? Isn't that just an API
>> limitation and wouldn't the backend for transactions support it with only
>> minor changes to the API (do not fail automatically dangling transactions
>> on Producer restart)? Flushing is already there so that _should_ handle the
>> pre-commit. Again, maybe I'm missing something and for sure I am not
>> familiar with Kafka's internals.
>>
>> Piotrek
>>
>> 2017-06-16 15:47 GMT+02:00 Michal Borowiecki <
>> michal.borowiecki@openbet.com>:
>>
>>> I don't think KIP-98 is as ambitious as to provide support for
>>> distributed transactions (2 phase commit).
>>>
>>> It would be great if I was wrong though :P
>>>
>>> Cheers,
>>>
>>> Michał
>>>
>>> On 16/06/17 14:21, Piotr Nowojski wrote:
>>>
>>> Hi,
>>>
>>> I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
>>> both this KIP-98 document and I looked into the code that is on the master
>>> branch. I would like to use it to implement some two phase commit mechanism
>>> on top of the Kafka's transactions, that would allow me to tie multiple
>>> systems (some of them might not be Kafka) in one transaction.
>>>
>>> Maybe I'm missing something but the problem is I don't see a way to
>>> implement it using proposed Kafka's transactions API. Even if I have just
>>> two processes writing to Kafka topics, I don't know how can I guarantee
>>> that if one's transaction is committed, the other will also eventually be
>>> committed. This is because if first KafkaProducer successfully commits, but
>>> the second one fails before committing it's data, after restart the second
>>> one's "initTransactions" call will (according to my understanding of the
>>> API) abort previously non completed transactions.
>>>
>>> Usually transactional systems expose API like this<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/> <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
>>> Namely there is a known identifier for a transaction and you can pre-commit
>>> it (void prepare(...) method in before mentioned example) and then commit
>>> or you can abort this transaction. Usually pre-commit involves flushing
>>> stuff to some temporary files and commit move those files to the final
>>> directory. In case of machine/process failure, if it was before
>>> "pre-commit", we can just rollback all transactions from all of the
>>> processes. However once every process acknowledge that it completed
>>> "pre-commit", each process should call "commit". If some process fails at
>>> that stage, after restarting this process, I would expect to be able to
>>> restore it's "pre-committed" transaction (having remembered transaction's
>>> id) and re attempt to commit it - which should be guaranteed to eventually
>>> succeed.
>>>
>>> In other words, it seems to me like the missing features of this API for me
>>> are:
>>> 1. possibility to resume transactions after machine/process crash. At least
>>> I would expect to be able to commit "flushed"/"pre-committed" data for such
>>> transactions.
>>> 2. making sure that committing already committed transactions doesn't brake
>>> anything
>>>
>>> Or maybe there is some other way to integrate Kafka into such two phase
>>> commit system that I'm missing?
>>>
>>> Thanks, Piotrek
>>>
>>>
>>>
>>> --
>>> <http://www.openbet.com/> Michal Borowiecki
>>> Senior Software Engineer L4
>>> T: +44 208 742 1600 <+44%2020%208742%201600>
>>>
>>>
>>> +44 203 249 8448 <+44%2020%203249%208448>
>>>
>>>
>>>
>>> E: michal.borowiecki@openbet.com
>>> W: www.openbet.com
>>> OpenBet Ltd
>>>
>>> Chiswick Park Building 9
>>>
>>> 566 Chiswick High Rd
>>>
>>> London
>>>
>>> W4 5XT
>>>
>>> UK
>>> <https://www.openbet.com/email_promo>
>>> This message is confidential and intended only for the addressee. If you
>>> have received this message in error, please immediately notify the
>>> postmaster@openbet.com and delete it from your system as well as any
>>> copies. The content of e-mails as well as traffic data may be monitored by
>>> OpenBet for employment and security purposes. To protect the environment
>>> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
>>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>>> United Kingdom. A company registered in England and Wales. Registered no.
>>> 3134634. VAT no. GB927523612
>>>
>>
>>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka 0.11 transactions API question

Posted by Michal Borowiecki <mi...@openbet.com>.
I'm not sure if I understood correctly, but if you want to integrate a 
single kafka producer transaction (or any transaction manager that only 
supports local transaction) into a distributed transaction, I think you 
can do so as long as all other involved transaction managers support 
2-phase commit. In other words, you can include one (and only one) 
local-only transaction into a distributed transaction.

The steps the distributed transaction coordinator would have to take to 
commit the transaction would be:

 1. call prepare on each of the transaction participants that support
    2-phase commit
 2. if any of them fails, abort all transactions, otherwise, proceed
 3. call commit on the one transaction participant that does not support
    2-phase commit (kafka producer in this case)
 4. if that fails, abort all transactions, otherwise, proceed
 5. call commit on all the transaction participants that support 2-phase
    commit (since prepare on these succeeded they should not refuse to
    commit at this point)

So as to your concern about getting "clearance" (I take it as the 
equivalent of the "prepare" call) from the kafka producer, you don't 
really need it IMO, as if commit fails on the kafka producer, you can 
still abort the remaining transactions.

Of course you can't do that if you have more than one transaction that 
doesn't support 2-phase commit in play.

Having said that, the advice these days seems to be to design 
distributed systems for eventual consistency, as using distributed 
transactions, while tempting, often leads to resource exhaustion as 
transaction managers have to go the extra mile to ensure they can commit 
any transaction that had prepare return successfully.

Just my 5c. I may be wrong in any of the above, please point it out if so.

Cheers,

Michał


On 19/06/17 14:57, Piotr Nowojski wrote:
> Sorry for responding to my own message, but when I sent an original 
> message/question I was not subscribed to this mailing list and now I 
> can not respond to Matthias answer directly.
>
> I don't want to share a transaction between multiple Producers 
> threads/processes, I just would like to resume an interrupted 
> transaction after a machine crash.
>
> Let me try to phrase the problem differently:
>
> From the perspective of a producer that writes to Kafka, we have the 
> following situation:
>
> We integrate the producer with transaction in another system. A number 
> or records should go together atomically (a transaction). Before 
> committing the transaction, we frequently need to ask for a 
> "clearance" status, and if we get the "go ahead" we want to commit the 
> transaction.
>
> Unfortunately, as soon as we get that "clearance", we cannot reproduce 
> the records any more (the are dropped from the original data stream).
> If something fails between the "go ahead" and the committing, we need 
> to retry the transaction, so we need to come up again with all 
> records. As a result we have to persist the records before we start 
> the write transaction. That is a painful overhead to pay, and a lot of 
> additional operational complexity.
>
> The simplest way to support that pattern without extra overhead would 
> we could "resume" a transaction:
>
>   - Each transaction as a unique Transaction ID
>   - If a crash of the producer occurs, the transaction is NOT aborted 
> automatically.
>   - Instead, the restarted producer process reconnects to the 
> transaction and decides to commit it or abort it.
>   - The transaction timeout aborts the transaction after a while if 
> inactivity.
>
> Maybe this could be easily supported?
>
> Thanks, Piotrek
>
> 2017-06-16 17:59 GMT+02:00 Piotr Nowojski <piotr.nowojski@gmail.com 
> <ma...@gmail.com>>:
>
>     But isn't it a low hanging fruit at this moment? Isn't that just
>     an API limitation and wouldn't the backend for transactions
>     support it with only minor changes to the API (do not fail
>     automatically dangling transactions on Producer restart)? Flushing
>     is already there so that _should_ handle the pre-commit. Again,
>     maybe I'm missing something and for sure I am not familiar with
>     Kafka's internals.
>
>     Piotrek
>
>     2017-06-16 15:47 GMT+02:00 Michal Borowiecki
>     <michal.borowiecki@openbet.com
>     <ma...@openbet.com>>:
>
>         I don't think KIP-98 is as ambitious as to provide support for
>         distributed transactions (2 phase commit).
>
>         It would be great if I was wrong though :P
>
>         Cheers,
>
>         Michał
>
>
>         On 16/06/17 14:21, Piotr Nowojski wrote:
>>         Hi, I'm looking into Kafka's transactions API as proposed in
>>         KIP-98. I've read both this KIP-98 document and I looked into
>>         the code that is on the master branch. I would like to use it
>>         to implement some two phase commit mechanism on top of the
>>         Kafka's transactions, that would allow me to tie multiple
>>         systems (some of them might not be Kafka) in one transaction.
>>         Maybe I'm missing something but the problem is I don't see a
>>         way to implement it using proposed Kafka's transactions API.
>>         Even if I have just two processes writing to Kafka topics, I
>>         don't know how can I guarantee that if one's transaction is
>>         committed, the other will also eventually be committed. This
>>         is because if first KafkaProducer successfully commits, but
>>         the second one fails before committing it's data, after
>>         restart the second one's "initTransactions" call will
>>         (according to my understanding of the API) abort previously
>>         non completed transactions. Usually transactional systems
>>         expose API like this
>>         <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>
>>         <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
>>         Namely there is a known identifier for a transaction and you can pre-commit
>>         it (void prepare(...) method in before mentioned example) and then commit
>>         or you can abort this transaction. Usually pre-commit involves flushing
>>         stuff to some temporary files and commit move those files to the final
>>         directory. In case of machine/process failure, if it was before
>>         "pre-commit", we can just rollback all transactions from all of the
>>         processes. However once every process acknowledge that it completed
>>         "pre-commit", each process should call "commit". If some process fails at
>>         that stage, after restarting this process, I would expect to be able to
>>         restore it's "pre-committed" transaction (having remembered transaction's
>>         id) and re attempt to commit it - which should be guaranteed to eventually
>>         succeed.
>>
>>         In other words, it seems to me like the missing features of this API for me
>>         are:
>>         1. possibility to resume transactions after machine/process crash. At least
>>         I would expect to be able to commit "flushed"/"pre-committed" data for such
>>         transactions.
>>         2. making sure that committing already committed transactions doesn't brake
>>         anything
>>
>>         Or maybe there is some other way to integrate Kafka into such two phase
>>         commit system that I'm missing?
>>
>>         Thanks, Piotrek
>>
>
>         -- 
>         <http://www.openbet.com/> 	Michal Borowiecki
>         Senior Software Engineer L4
>         	T: 	+44 208 742 1600 <tel:+44%2020%208742%201600>
>
>         	
>         	+44 203 249 8448 <tel:+44%2020%203249%208448>
>
>         	
>         	
>         	E: 	michal.borowiecki@openbet.com
>         <ma...@openbet.com>
>         	W: 	www.openbet.com <http://www.openbet.com/>
>
>         	
>         	OpenBet Ltd
>
>         	Chiswick Park Building 9
>
>         	566 Chiswick High Rd
>
>         	London
>
>         	W4 5XT
>
>         	UK
>
>         	
>         <https://www.openbet.com/email_promo>
>
>         This message is confidential and intended only for the
>         addressee. If you have received this message in error, please
>         immediately notify the postmaster@openbet.com
>         <ma...@openbet.com> and delete it from your system
>         as well as any copies. The content of e-mails as well as
>         traffic data may be monitored by OpenBet for employment and
>         security purposes. To protect the environment please do not
>         print this e-mail unless necessary. OpenBet Ltd. Registered
>         Office: Chiswick Park Building 9, 566 Chiswick High Road,
>         London, W4 5XT, United Kingdom. A company registered in
>         England and Wales. Registered no. 3134634. VAT no. GB927523612
>
>
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Kafka 0.11 transactions API question

Posted by Piotr Nowojski <pi...@gmail.com>.
Sorry for responding to my own message, but when I sent an original
message/question I was not subscribed to this mailing list and now I can
not respond to Matthias answer directly.

I don't want to share a transaction between multiple Producers
threads/processes, I just would like to resume an interrupted transaction
after a machine crash.

Let me try to phrase the problem differently:

From the perspective of a producer that writes to Kafka, we have the
following situation:

We integrate the producer with transaction in another system. A number or
records should go together atomically (a transaction). Before committing
the transaction, we frequently need to ask for a "clearance" status, and if
we get the "go ahead" we want to commit the transaction.

Unfortunately, as soon as we get that "clearance", we cannot reproduce the
records any more (the are dropped from the original data stream).
If something fails between the "go ahead" and the committing, we need to
retry the transaction, so we need to come up again with all records. As a
result we have to persist the records before we start the write
transaction. That is a painful overhead to pay, and a lot of additional
operational complexity.

The simplest way to support that pattern without extra overhead would we
could "resume" a transaction:

  - Each transaction as a unique Transaction ID
  - If a crash of the producer occurs, the transaction is NOT aborted
automatically.
  - Instead, the restarted producer process reconnects to the transaction
and decides to commit it or abort it.
  - The transaction timeout aborts the transaction after a while if
inactivity.

Maybe this could be easily supported?

Thanks, Piotrek

2017-06-16 17:59 GMT+02:00 Piotr Nowojski <pi...@gmail.com>:

> But isn't it a low hanging fruit at this moment? Isn't that just an API
> limitation and wouldn't the backend for transactions support it with only
> minor changes to the API (do not fail automatically dangling transactions
> on Producer restart)? Flushing is already there so that _should_ handle the
> pre-commit. Again, maybe I'm missing something and for sure I am not
> familiar with Kafka's internals.
>
> Piotrek
>
> 2017-06-16 15:47 GMT+02:00 Michal Borowiecki <
> michal.borowiecki@openbet.com>:
>
>> I don't think KIP-98 is as ambitious as to provide support for
>> distributed transactions (2 phase commit).
>>
>> It would be great if I was wrong though :P
>>
>> Cheers,
>>
>> Michał
>>
>> On 16/06/17 14:21, Piotr Nowojski wrote:
>>
>> Hi,
>>
>> I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
>> both this KIP-98 document and I looked into the code that is on the master
>> branch. I would like to use it to implement some two phase commit mechanism
>> on top of the Kafka's transactions, that would allow me to tie multiple
>> systems (some of them might not be Kafka) in one transaction.
>>
>> Maybe I'm missing something but the problem is I don't see a way to
>> implement it using proposed Kafka's transactions API. Even if I have just
>> two processes writing to Kafka topics, I don't know how can I guarantee
>> that if one's transaction is committed, the other will also eventually be
>> committed. This is because if first KafkaProducer successfully commits, but
>> the second one fails before committing it's data, after restart the second
>> one's "initTransactions" call will (according to my understanding of the
>> API) abort previously non completed transactions.
>>
>> Usually transactional systems expose API like this<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/> <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
>> Namely there is a known identifier for a transaction and you can pre-commit
>> it (void prepare(...) method in before mentioned example) and then commit
>> or you can abort this transaction. Usually pre-commit involves flushing
>> stuff to some temporary files and commit move those files to the final
>> directory. In case of machine/process failure, if it was before
>> "pre-commit", we can just rollback all transactions from all of the
>> processes. However once every process acknowledge that it completed
>> "pre-commit", each process should call "commit". If some process fails at
>> that stage, after restarting this process, I would expect to be able to
>> restore it's "pre-committed" transaction (having remembered transaction's
>> id) and re attempt to commit it - which should be guaranteed to eventually
>> succeed.
>>
>> In other words, it seems to me like the missing features of this API for me
>> are:
>> 1. possibility to resume transactions after machine/process crash. At least
>> I would expect to be able to commit "flushed"/"pre-committed" data for such
>> transactions.
>> 2. making sure that committing already committed transactions doesn't brake
>> anything
>>
>> Or maybe there is some other way to integrate Kafka into such two phase
>> commit system that I'm missing?
>>
>> Thanks, Piotrek
>>
>>
>>
>> --
>> <http://www.openbet.com/> Michal Borowiecki
>> Senior Software Engineer L4
>> T: +44 208 742 1600 <+44%2020%208742%201600>
>>
>>
>> +44 203 249 8448 <+44%2020%203249%208448>
>>
>>
>>
>> E: michal.borowiecki@openbet.com
>> W: www.openbet.com
>> OpenBet Ltd
>>
>> Chiswick Park Building 9
>>
>> 566 Chiswick High Rd
>>
>> London
>>
>> W4 5XT
>>
>> UK
>> <https://www.openbet.com/email_promo>
>> This message is confidential and intended only for the addressee. If you
>> have received this message in error, please immediately notify the
>> postmaster@openbet.com and delete it from your system as well as any
>> copies. The content of e-mails as well as traffic data may be monitored by
>> OpenBet for employment and security purposes. To protect the environment
>> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>> United Kingdom. A company registered in England and Wales. Registered no.
>> 3134634. VAT no. GB927523612
>>
>
>

Re: Kafka 0.11 transactions API question

Posted by Piotr Nowojski <pi...@gmail.com>.
But isn't it a low hanging fruit at this moment? Isn't that just an API
limitation and wouldn't the backend for transactions support it with only
minor changes to the API (do not fail automatically dangling transactions
on Producer restart)? Flushing is already there so that _should_ handle the
pre-commit. Again, maybe I'm missing something and for sure I am not
familiar with Kafka's internals.

Piotrek

2017-06-16 15:47 GMT+02:00 Michal Borowiecki <mi...@openbet.com>
:

> I don't think KIP-98 is as ambitious as to provide support for distributed
> transactions (2 phase commit).
>
> It would be great if I was wrong though :P
>
> Cheers,
>
> Michał
>
> On 16/06/17 14:21, Piotr Nowojski wrote:
>
> Hi,
>
> I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
> both this KIP-98 document and I looked into the code that is on the master
> branch. I would like to use it to implement some two phase commit mechanism
> on top of the Kafka's transactions, that would allow me to tie multiple
> systems (some of them might not be Kafka) in one transaction.
>
> Maybe I'm missing something but the problem is I don't see a way to
> implement it using proposed Kafka's transactions API. Even if I have just
> two processes writing to Kafka topics, I don't know how can I guarantee
> that if one's transaction is committed, the other will also eventually be
> committed. This is because if first KafkaProducer successfully commits, but
> the second one fails before committing it's data, after restart the second
> one's "initTransactions" call will (according to my understanding of the
> API) abort previously non completed transactions.
>
> Usually transactional systems expose API like this<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/> <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
> Namely there is a known identifier for a transaction and you can pre-commit
> it (void prepare(...) method in before mentioned example) and then commit
> or you can abort this transaction. Usually pre-commit involves flushing
> stuff to some temporary files and commit move those files to the final
> directory. In case of machine/process failure, if it was before
> "pre-commit", we can just rollback all transactions from all of the
> processes. However once every process acknowledge that it completed
> "pre-commit", each process should call "commit". If some process fails at
> that stage, after restarting this process, I would expect to be able to
> restore it's "pre-committed" transaction (having remembered transaction's
> id) and re attempt to commit it - which should be guaranteed to eventually
> succeed.
>
> In other words, it seems to me like the missing features of this API for me
> are:
> 1. possibility to resume transactions after machine/process crash. At least
> I would expect to be able to commit "flushed"/"pre-committed" data for such
> transactions.
> 2. making sure that committing already committed transactions doesn't brake
> anything
>
> Or maybe there is some other way to integrate Kafka into such two phase
> commit system that I'm missing?
>
> Thanks, Piotrek
>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka 0.11 transactions API question

Posted by Michal Borowiecki <mi...@openbet.com>.
I don't think KIP-98 is as ambitious as to provide support for 
distributed transactions (2 phase commit).

It would be great if I was wrong though :P

Cheers,

Michał


On 16/06/17 14:21, Piotr Nowojski wrote:
> Hi,
>
> I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
> both this KIP-98 document and I looked into the code that is on the master
> branch. I would like to use it to implement some two phase commit mechanism
> on top of the Kafka's transactions, that would allow me to tie multiple
> systems (some of them might not be Kafka) in one transaction.
>
> Maybe I'm missing something but the problem is I don't see a way to
> implement it using proposed Kafka's transactions API. Even if I have just
> two processes writing to Kafka topics, I don't know how can I guarantee
> that if one's transaction is committed, the other will also eventually be
> committed. This is because if first KafkaProducer successfully commits, but
> the second one fails before committing it's data, after restart the second
> one's "initTransactions" call will (according to my understanding of the
> API) abort previously non completed transactions.
>
> Usually transactional systems expose API like this
> <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
> Namely there is a known identifier for a transaction and you can pre-commit
> it (void prepare(...) method in before mentioned example) and then commit
> or you can abort this transaction. Usually pre-commit involves flushing
> stuff to some temporary files and commit move those files to the final
> directory. In case of machine/process failure, if it was before
> "pre-commit", we can just rollback all transactions from all of the
> processes. However once every process acknowledge that it completed
> "pre-commit", each process should call "commit". If some process fails at
> that stage, after restarting this process, I would expect to be able to
> restore it's "pre-committed" transaction (having remembered transaction's
> id) and re attempt to commit it - which should be guaranteed to eventually
> succeed.
>
> In other words, it seems to me like the missing features of this API for me
> are:
> 1. possibility to resume transactions after machine/process crash. At least
> I would expect to be able to commit "flushed"/"pre-committed" data for such
> transactions.
> 2. making sure that committing already committed transactions doesn't brake
> anything
>
> Or maybe there is some other way to integrate Kafka into such two phase
> commit system that I'm missing?
>
> Thanks, Piotrek
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612