You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels van Kaam <ni...@vankaam.net> on 2018/03/09 13:49:30 UTC

"Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Hi,

I'm working on a custom implementation of a sink which I would like to use
with exactly once semantics. Therefore I have implemented the
TwoPhaseCommitSinkFunction class as mentioned in this recent post:
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

I have some integration tests which run jobs using the custom sink with a
finite dataset (A RichSourceFunction with a "finite" run method). The tests
fail because of missing data. I noticed that is due to the last transaction
being aborted.

When looking into the source code that makes sense because the close()
implementation of TwoPhaseCommitSinkFunction calls abort on the current
transaction:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java


I could override this behaviour and perform a commit, but then I would
perform a commit without getting the checkpoint completed notification,
thus not properly maintaining exactly once guarantees

Is (and how is) it possible to have end-to-end exactly once guarantees when
dealing with (sometimes) finite jobs?

Thanks!
Niels

Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Posted by Niels van Kaam <ni...@vankaam.net>.
Thank you! I already have a custom source function so adding the hacky
solution would not be too much effort.

Looking forward to the "proper" solution!

Niels

On Fri, Mar 9, 2018, 16:00 Piotr Nowojski <pi...@data-artisans.com> wrote:

> Hi,
>
> Short answer is: no, at the moment clean shutdown is not implemented for
> the streaming, but it’s on our to do list for the future.
>
> Hacky answer: you could implement some custom code, that would wait for at
> least one completed checkpoint after the last input data. But that would
> require modifying a source function or at least wrapping it and there might
> be some corner cases that I haven’t thought about.
>
> Piotrek
>
>
> On 9 Mar 2018, at 14:49, Niels van Kaam <ni...@vankaam.net> wrote:
>
> Hi,
>
> I'm working on a custom implementation of a sink which I would like to use
> with exactly once semantics. Therefore I have implemented the
> TwoPhaseCommitSinkFunction class as mentioned in this recent post:
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> I have some integration tests which run jobs using the custom sink with a
> finite dataset (A RichSourceFunction with a "finite" run method). The tests
> fail because of missing data. I noticed that is due to the last transaction
> being aborted.
>
> When looking into the source code that makes sense because the close()
> implementation of TwoPhaseCommitSinkFunction calls abort on the current
> transaction:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
>
>
> I could override this behaviour and perform a commit, but then I would
> perform a commit without getting the checkpoint completed notification,
> thus not properly maintaining exactly once guarantees
>
> Is (and how is) it possible to have end-to-end exactly once guarantees
> when dealing with (sometimes) finite jobs?
>
> Thanks!
> Niels
>
>
>

Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Short answer is: no, at the moment clean shutdown is not implemented for the streaming, but it’s on our to do list for the future.

Hacky answer: you could implement some custom code, that would wait for at least one completed checkpoint after the last input data. But that would require modifying a source function or at least wrapping it and there might be some corner cases that I haven’t thought about.

Piotrek

> On 9 Mar 2018, at 14:49, Niels van Kaam <ni...@vankaam.net> wrote:
> 
> Hi,
> 
> I'm working on a custom implementation of a sink which I would like to use with exactly once semantics. Therefore I have implemented the TwoPhaseCommitSinkFunction class as mentioned in this recent post: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html <https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html>
> 
> I have some integration tests which run jobs using the custom sink with a finite dataset (A RichSourceFunction with a "finite" run method). The tests fail because of missing data. I noticed that is due to the last transaction being aborted.
> 
> When looking into the source code that makes sense because the close() implementation of TwoPhaseCommitSinkFunction calls abort on the current transaction: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java> 
> 
> I could override this behaviour and perform a commit, but then I would perform a commit without getting the checkpoint completed notification, thus not properly maintaining exactly once guarantees
> 
> Is (and how is) it possible to have end-to-end exactly once guarantees when dealing with (sometimes) finite jobs?
> 
> Thanks!
> Niels
>