You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2021/04/16 14:07:50 UTC

2-phase commit and kafka

Hello folks

So AFAIK data loss on exactly once will happen if

   -

   start a transaction on kafka.
   -

   pre commit done ( kafka is prepared for the commit )
   -

   commit fails ( kafka went own or n/w issue or what ever ). kafka has an
   uncommitted transaction
   -

   pipe was down for say n minutes and the kafka based transaction time out
   is m minutes, where m < n
   -

   the pipe restarts and tries to commit an aborted transaction and fails
   and thus data loss

Thus it is imperative that the ransaction.max.timeout.ms out on kafka is a
high value ( like n hours ) which should be greater then an SLA for
downtime of the pipe. As in we have to ensure that the pipe is restarted
before the transaction.timeout.ms set on the broker.

The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
only implication is what happens if we start a brand new pipeline on the
same topics which has yet to be resolved transactions, mostly b’coz of
extended timeout of a previous pipe .. I would assume we are delayed then
given that kafka will stall subsequent transactions from being visible to
the consumer, b'coz of this one outstanding trsnasaction ?

And if that is the case, then understandably we have to abort those
dangling transactions before the 24 hrs time out. While there probably a
way to do that, does flink help.. as in set a property that will abort a
transaction on kafka, b'coz we need it to, given the above..

Again I might have totally misunderstood the whole mechanics and if yes
apologies and will appreciate some clarifications.


Thanks.

Re: 2-phase commit and kafka

Posted by Vishal Santoshi <vi...@gmail.com>.
Thank you for the clarification.  That is what I was thinking.

Regards.

On Sat, Apr 17, 2021 at 3:24 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Vishal,
>
> no afaik Flink only cancels on restore from a checkpoint or savepoint
> (manual or automatic). That's what I meant with
>
>> they may linger for a longer time if you stop an application entirely
>> (for example for an upgrade).
>>
>
> Your upgraded application will hopefully be up before Kafka transactions
> time out.
>
> However, my statement was a bit misleading: If you gracefully stop the
> Flink application (stop with drain), then Flink should also close all
> transactions. You'd only get lingering transactions when you cancel the
> Flink application and start from scratch (e.g., error in business logic).
> But then you usually also recreate the topic which hopefully also removes
> all pending transactions.
>
> On Fri, Apr 16, 2021 at 10:28 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Thanks for the feedback and. glad I am on the right track.
>>
>> > Outstanding transactions should be automatically aborted on restart by
>> Flink.
>>
>> Let me understand this
>>
>> 1. Flink pipe is cancelled and has dangling kafka transactions.
>> 2. A new Flink pipe  ( not restored from a checkpoint or sp ) is started
>> which is essentially the same pipe as 1 but does not restore. Would
>> the dangling kafka transactions be aborted ?
>>
>> If yes, how does it work? As in how does the new pipe. know which
>> transactions to abort ? Does it ask kafka for pending transactions and know
>> which one belongs to the first pipe ( maybe b'coz they share some id b'coz
>> of name of the pipe or something else ) ?
>>
>> Thanks again,
>>
>> Vishal
>>
>>
>>
>> On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Vishal,
>>>
>>> I think you pretty much nailed it.
>>>
>>> Outstanding transactions should be automatically aborted on restart by
>>> Flink. Flink (re)uses a pool of transaction ids, such that all possible
>>> transactions by Flink are canceled on restart.
>>>
>>> I guess the biggest downside of using a large transaction timeout is
>>> that other clients might leak transactions for a longer period of time or
>>> that they may linger for a longer time if you stop an application entirely
>>> (for example for an upgrade).
>>>
>>> On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Hello folks
>>>>
>>>> So AFAIK data loss on exactly once will happen if
>>>>
>>>>    -
>>>>
>>>>    start a transaction on kafka.
>>>>    -
>>>>
>>>>    pre commit done ( kafka is prepared for the commit )
>>>>    -
>>>>
>>>>    commit fails ( kafka went own or n/w issue or what ever ). kafka
>>>>    has an uncommitted transaction
>>>>    -
>>>>
>>>>    pipe was down for say n minutes and the kafka based transaction
>>>>    time out is m minutes, where m < n
>>>>    -
>>>>
>>>>    the pipe restarts and tries to commit an aborted transaction and
>>>>    fails and thus data loss
>>>>
>>>> Thus it is imperative that the ransaction.max.timeout.ms out on kafka
>>>> is a high value ( like n hours ) which should be greater then an SLA for
>>>> downtime of the pipe. As in we have to ensure that the pipe is restarted
>>>> before the transaction.timeout.ms set on the broker.
>>>>
>>>> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
>>>> only implication is what happens if we start a brand new pipeline on the
>>>> same topics which has yet to be resolved transactions, mostly b’coz of
>>>> extended timeout of a previous pipe .. I would assume we are delayed then
>>>> given that kafka will stall subsequent transactions from being visible to
>>>> the consumer, b'coz of this one outstanding trsnasaction ?
>>>>
>>>> And if that is the case, then understandably we have to abort those
>>>> dangling transactions before the 24 hrs time out. While there probably a
>>>> way to do that, does flink help.. as in set a property that will abort a
>>>> transaction on kafka, b'coz we need it to, given the above..
>>>>
>>>> Again I might have totally misunderstood the whole mechanics and if yes
>>>> apologies and will appreciate some clarifications.
>>>>
>>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>>

Re: 2-phase commit and kafka

Posted by Arvid Heise <ar...@apache.org>.
Hi Vishal,

no afaik Flink only cancels on restore from a checkpoint or savepoint
(manual or automatic). That's what I meant with

> they may linger for a longer time if you stop an application entirely (for
> example for an upgrade).
>

Your upgraded application will hopefully be up before Kafka transactions
time out.

However, my statement was a bit misleading: If you gracefully stop the
Flink application (stop with drain), then Flink should also close all
transactions. You'd only get lingering transactions when you cancel the
Flink application and start from scratch (e.g., error in business logic).
But then you usually also recreate the topic which hopefully also removes
all pending transactions.

On Fri, Apr 16, 2021 at 10:28 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Thanks for the feedback and. glad I am on the right track.
>
> > Outstanding transactions should be automatically aborted on restart by
> Flink.
>
> Let me understand this
>
> 1. Flink pipe is cancelled and has dangling kafka transactions.
> 2. A new Flink pipe  ( not restored from a checkpoint or sp ) is started
> which is essentially the same pipe as 1 but does not restore. Would
> the dangling kafka transactions be aborted ?
>
> If yes, how does it work? As in how does the new pipe. know which
> transactions to abort ? Does it ask kafka for pending transactions and know
> which one belongs to the first pipe ( maybe b'coz they share some id b'coz
> of name of the pipe or something else ) ?
>
> Thanks again,
>
> Vishal
>
>
>
> On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Vishal,
>>
>> I think you pretty much nailed it.
>>
>> Outstanding transactions should be automatically aborted on restart by
>> Flink. Flink (re)uses a pool of transaction ids, such that all possible
>> transactions by Flink are canceled on restart.
>>
>> I guess the biggest downside of using a large transaction timeout is that
>> other clients might leak transactions for a longer period of time or that
>> they may linger for a longer time if you stop an application entirely (for
>> example for an upgrade).
>>
>> On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Hello folks
>>>
>>> So AFAIK data loss on exactly once will happen if
>>>
>>>    -
>>>
>>>    start a transaction on kafka.
>>>    -
>>>
>>>    pre commit done ( kafka is prepared for the commit )
>>>    -
>>>
>>>    commit fails ( kafka went own or n/w issue or what ever ). kafka has
>>>    an uncommitted transaction
>>>    -
>>>
>>>    pipe was down for say n minutes and the kafka based transaction time
>>>    out is m minutes, where m < n
>>>    -
>>>
>>>    the pipe restarts and tries to commit an aborted transaction and
>>>    fails and thus data loss
>>>
>>> Thus it is imperative that the ransaction.max.timeout.ms out on kafka
>>> is a high value ( like n hours ) which should be greater then an SLA for
>>> downtime of the pipe. As in we have to ensure that the pipe is restarted
>>> before the transaction.timeout.ms set on the broker.
>>>
>>> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
>>> only implication is what happens if we start a brand new pipeline on the
>>> same topics which has yet to be resolved transactions, mostly b’coz of
>>> extended timeout of a previous pipe .. I would assume we are delayed then
>>> given that kafka will stall subsequent transactions from being visible to
>>> the consumer, b'coz of this one outstanding trsnasaction ?
>>>
>>> And if that is the case, then understandably we have to abort those
>>> dangling transactions before the 24 hrs time out. While there probably a
>>> way to do that, does flink help.. as in set a property that will abort a
>>> transaction on kafka, b'coz we need it to, given the above..
>>>
>>> Again I might have totally misunderstood the whole mechanics and if yes
>>> apologies and will appreciate some clarifications.
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>

Re: 2-phase commit and kafka

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks for the feedback and. glad I am on the right track.

> Outstanding transactions should be automatically aborted on restart by
Flink.

Let me understand this

1. Flink pipe is cancelled and has dangling kafka transactions.
2. A new Flink pipe  ( not restored from a checkpoint or sp ) is started
which is essentially the same pipe as 1 but does not restore. Would
the dangling kafka transactions be aborted ?

If yes, how does it work? As in how does the new pipe. know which
transactions to abort ? Does it ask kafka for pending transactions and know
which one belongs to the first pipe ( maybe b'coz they share some id b'coz
of name of the pipe or something else ) ?

Thanks again,

Vishal



On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Vishal,
>
> I think you pretty much nailed it.
>
> Outstanding transactions should be automatically aborted on restart by
> Flink. Flink (re)uses a pool of transaction ids, such that all possible
> transactions by Flink are canceled on restart.
>
> I guess the biggest downside of using a large transaction timeout is that
> other clients might leak transactions for a longer period of time or that
> they may linger for a longer time if you stop an application entirely (for
> example for an upgrade).
>
> On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Hello folks
>>
>> So AFAIK data loss on exactly once will happen if
>>
>>    -
>>
>>    start a transaction on kafka.
>>    -
>>
>>    pre commit done ( kafka is prepared for the commit )
>>    -
>>
>>    commit fails ( kafka went own or n/w issue or what ever ). kafka has
>>    an uncommitted transaction
>>    -
>>
>>    pipe was down for say n minutes and the kafka based transaction time
>>    out is m minutes, where m < n
>>    -
>>
>>    the pipe restarts and tries to commit an aborted transaction and
>>    fails and thus data loss
>>
>> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is
>> a high value ( like n hours ) which should be greater then an SLA for
>> downtime of the pipe. As in we have to ensure that the pipe is restarted
>> before the transaction.timeout.ms set on the broker.
>>
>> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
>> only implication is what happens if we start a brand new pipeline on the
>> same topics which has yet to be resolved transactions, mostly b’coz of
>> extended timeout of a previous pipe .. I would assume we are delayed then
>> given that kafka will stall subsequent transactions from being visible to
>> the consumer, b'coz of this one outstanding trsnasaction ?
>>
>> And if that is the case, then understandably we have to abort those
>> dangling transactions before the 24 hrs time out. While there probably a
>> way to do that, does flink help.. as in set a property that will abort a
>> transaction on kafka, b'coz we need it to, given the above..
>>
>> Again I might have totally misunderstood the whole mechanics and if yes
>> apologies and will appreciate some clarifications.
>>
>>
>> Thanks.
>>
>>
>>
>>
>>

Re: 2-phase commit and kafka

Posted by Arvid Heise <ar...@apache.org>.
Hi Vishal,

I think you pretty much nailed it.

Outstanding transactions should be automatically aborted on restart by
Flink. Flink (re)uses a pool of transaction ids, such that all possible
transactions by Flink are canceled on restart.

I guess the biggest downside of using a large transaction timeout is that
other clients might leak transactions for a longer period of time or that
they may linger for a longer time if you stop an application entirely (for
example for an upgrade).

On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Hello folks
>
> So AFAIK data loss on exactly once will happen if
>
>    -
>
>    start a transaction on kafka.
>    -
>
>    pre commit done ( kafka is prepared for the commit )
>    -
>
>    commit fails ( kafka went own or n/w issue or what ever ). kafka has
>    an uncommitted transaction
>    -
>
>    pipe was down for say n minutes and the kafka based transaction time
>    out is m minutes, where m < n
>    -
>
>    the pipe restarts and tries to commit an aborted transaction and fails
>    and thus data loss
>
> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is
> a high value ( like n hours ) which should be greater then an SLA for
> downtime of the pipe. As in we have to ensure that the pipe is restarted
> before the transaction.timeout.ms set on the broker.
>
> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
> only implication is what happens if we start a brand new pipeline on the
> same topics which has yet to be resolved transactions, mostly b’coz of
> extended timeout of a previous pipe .. I would assume we are delayed then
> given that kafka will stall subsequent transactions from being visible to
> the consumer, b'coz of this one outstanding trsnasaction ?
>
> And if that is the case, then understandably we have to abort those
> dangling transactions before the 24 hrs time out. While there probably a
> way to do that, does flink help.. as in set a property that will abort a
> transaction on kafka, b'coz we need it to, given the above..
>
> Again I might have totally misunderstood the whole mechanics and if yes
> apologies and will appreciate some clarifications.
>
>
> Thanks.
>
>
>
>
>