You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kamil Dziublinski <ka...@gmail.com> on 2017/04/25 17:54:10 UTC

Fault tolerance & idempotency on window functions

Hi guys,

I have a flink streaming job that reads from kafka, creates some statistics
increments and stores this in hbase (using normal puts).
I'm using fold function here of with window of few seconds.

My tests showed me that restoring state with window functions is not
exactly working how I expected.
I thought that if my window functions emits an aggregated object to a sink,
and that object fails in a sink, this write to hbase will be replayed. So
even if it actually got written to HBase, but flink thought it didnt (for
instance during network problem) I could be sure of idempotent writes. I
wanted to enforce that by using the timestamp of the first event used in
that window for aggregation.

Now correct me if I'm wrong but it seems that in the case of failure (even
if its in sink) whole flow is getting replayed from last checkpoint which
means that my window function might evict aggregated object in a different
form. For instance not only having tuples that failed but also other ones,
which would break my idempotency her and I might end up with having higher
counters than I should have.

Do you have any suggestion on how to solve/workaround such problem in flink?

Thanks,
Kamil.

Re: Fault tolerance & idempotency on window functions

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
When keying, keep in mind that Kafka and Flink might use a different scheme for hashing. For example, Flink also applies a murmur hash on the hash code retrieved from the key and then has some internal logic for assigning that hash to a key group (the internal unit of key partitioning). I don’t know what Kafka does internally for hashing.

Also keep in mind that even with event time, the events are not ordered by event time. So the event that arrives first does not necessarily have the lowest timestamp. Using event-time just means that we wait for the watermark to trigger window computation.

Regarding state size, if you don’t use merging windows (for example, session windows) then the only state that is kept for a purged window is a cleanup timer that is set for “end of window + allowed lateness”. That is, the state size does not increase with increasing allowed lateness if you purge. This could still fit into the heap state backend and you don’t necessarily need to consider RocksDB.

Best,
Aljoscha
> On 29. Apr 2017, at 10:19, Kamil Dziublinski <ka...@gmail.com> wrote:
> 
> Big thanks for replying Aljoscha, I spend quite some time on thinking how to solve this problem and came to some conclusions. Would be cool if you can verify if my logic is correct.
> 
> I decided that if I will partition data in kafka in the same way as I partition my window with keyby. It's tenant, user combination (I would still use hash out of it in kafka producer) and I will switch processing to event time (currently it was processing time) then during replay I could be 100% sure that first element will always be first, and watermark for triggering the window would also come at the same moment. This giving me idempotent writes of this batched object to HBase.
> 
> And for late events (by configuring lateness on the window itself) I would configure the trigger to fire & purge, so that it doesn't hold fired data. This way if late event arrives I could fire this late event with a different timestamp treating it in hbase as totally separate increment, not overriding my previous data. 
> The reason I want to purge data here on firing, is cause I would need to have allowed lateness on window of at least 2 months. So holding all data after firing for 2 months would be too costly.
> Additional question here, is there any cost to having allowed lateness very high (like 2 months) if we configure trigger to fire & purge. Like any additional state or metadata that flinks need to maintain that would take much memory from the cluster? Would I have to consider rocksdb here for state or FS state could still work?
> 
> On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> Yes, your analysis is correct: Flink will not retry for individual elements but will restore from the latest consistent checkpoint in case of failure. This also means that you can get different window results based on which element arrives first, i.e. you have a different timestamp on your output in that case.
> 
> One simple mitigation for the timestamp problem is to use the largest timestamp of elements within a window instead of the first timestamp. This will be stable across restores even if the order of arrival of elements changes. You can still get problems when it comes to late data and window triggering, if you cannot guarantee that your watermark is 100 % correct, though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives late that was not considered when doing the first processing that failed.
> 
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <kamil.dziublinski@gmail.com <ma...@gmail.com>> wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in flink?
> >
> > Thanks,
> > Kamil.
> >
> >
> 


Re: Fault tolerance & idempotency on window functions

Posted by Kamil Dziublinski <ka...@gmail.com>.
Big thanks for replying Aljoscha, I spend quite some time on thinking how
to solve this problem and came to some conclusions. Would be cool if you
can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I
partition my window with keyby. It's tenant, user combination (I would
still use hash out of it in kafka producer) and I will switch processing to
event time (currently it was processing time) then during replay I could be
100% sure that first element will always be first, and watermark for
triggering the window would also come at the same moment. This giving me
idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would
configure the trigger to fire & purge, so that it doesn't hold fired data.
This way if late event arrives I could fire this late event with a
different timestamp treating it in hbase as totally separate increment, not
overriding my previous data.
The reason I want to purge data here on firing, is cause I would need to
have allowed lateness on window of at least 2 months. So holding all data
after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very
high (like 2 months) if we configure trigger to fire & purge. Like any
additional state or metadata that flinks need to maintain that would take
much memory from the cluster? Would I have to consider rocksdb here for
state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> Yes, your analysis is correct: Flink will not retry for individual
> elements but will restore from the latest consistent checkpoint in case of
> failure. This also means that you can get different window results based on
> which element arrives first, i.e. you have a different timestamp on your
> output in that case.
>
> One simple mitigation for the timestamp problem is to use the largest
> timestamp of elements within a window instead of the first timestamp. This
> will be stable across restores even if the order of arrival of elements
> changes. You can still get problems when it comes to late data and window
> triggering, if you cannot guarantee that your watermark is 100 % correct,
> though. I.e. it might be that, upon restore, an element with an even larger
> timestamp arrives late that was not considered when doing the first
> processing that failed.
>
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <
> kamil.dziublinski@gmail.com> wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some
> statistics increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not
> exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a
> sink, and that object fails in a sink, this write to hbase will be
> replayed. So even if it actually got written to HBase, but flink thought it
> didnt (for instance during network problem) I could be sure of idempotent
> writes. I wanted to enforce that by using the timestamp of the first event
> used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure
> (even if its in sink) whole flow is getting replayed from last checkpoint
> which means that my window function might evict aggregated object in a
> different form. For instance not only having tuples that failed but also
> other ones, which would break my idempotency her and I might end up with
> having higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in
> flink?
> >
> > Thanks,
> > Kamil.
> >
> >
>
>

Re: Fault tolerance & idempotency on window functions

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
Yes, your analysis is correct: Flink will not retry for individual elements but will restore from the latest consistent checkpoint in case of failure. This also means that you can get different window results based on which element arrives first, i.e. you have a different timestamp on your output in that case.

One simple mitigation for the timestamp problem is to use the largest timestamp of elements within a window instead of the first timestamp. This will be stable across restores even if the order of arrival of elements changes. You can still get problems when it comes to late data and window triggering, if you cannot guarantee that your watermark is 100 % correct, though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives late that was not considered when doing the first processing that failed.

Best,
Aljoscha
> On 25. Apr 2017, at 19:54, Kamil Dziublinski <ka...@gmail.com> wrote:
> 
> Hi guys,
> 
> I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
> I'm using fold function here of with window of few seconds.
> 
> My tests showed me that restoring state with window functions is not exactly working how I expected.
> I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation. 
> 
> Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.
> 
> Do you have any suggestion on how to solve/workaround such problem in flink?
> 
> Thanks,
> Kamil.
> 
>