You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Santos <ds...@cryptolab.net> on 2016/11/03 18:43:16 UTC

Flink Time Window State

Hello,

I have some question that has been bugging me.
Let's say we have a Kafka Source.
Checkpoint is enabled, with a period of 5 seconds.
We have a FSBackend ( Hadoop ).

Now imagine we have a window a tumbling of 10 Minutes.

For simplicity we are going to say that we are counting all elements
arrinving in 10 Minutes. Something like this.

class Count extends FoldFunction[Event, Long] {

    override def fold(accumulator: Long, value: Event): Long =  {
      accumulator + 1
    }

}

So we have

source.
      window(<Tumbling>).
      apply(0, Count(), WindowFunction())

In the first 2 Minutes arrives 10 events, then we stop the
stream/task/job or it fails and then it is restarted, what will be the
state of the fold function ?
Will it be 10 and it will resume from there ? Or will it be 0 ?

It is kinda important to know because imagine we have a Window of 1 day.
And the job fails mid day. How will it resume ?

Best Regards


Re: Flink Time Window State

Posted by Ufuk Celebi <uc...@apache.org>.
The goal is to do it before the end of this year. For this to happen, the first release canidate would need to be available by end of November/beginning of December.

There is an ongoing discussion here: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Schedule-and-Scope-for-Flink-1-2-td14062.html#a14306

– Ufuk

On 7 November 2016 at 14:05:27, Daniel Santos (dsantos@cryptolab.net) wrote:
> Hi,
>  
> Thank you Ufuk.
>  
> Hmm. Out of curiosity.
> Is there any idea when will 1.2 be released?
>  
> Best Regards,
> Daniel Santos
>  
> On November 7, 2016 12:45:51 PM GMT+00:00, Ufuk Celebi wrote:
> >On 7 November 2016 at 13:06:16, Daniel Santos (dsantos@cryptolab.net)
> >wrote:
> >> I believe the job won't start from the last savepoint is that
> >correct,
> >> on versions ( > 1.2 ), it will start afresh ?
> >
> >Yes, with 1.2 you will be able to take a savepoint and then resume from
> >that savepoint with different parallelism. :-)
> >
> >– Ufuk
>  


Re: Flink Time Window State

Posted by Daniel Santos <ds...@cryptolab.net>.
Hi, 

Thank you Ufuk. 

Hmm. Out of curiosity. 
Is there any idea when will 1.2 be released? 

Best Regards, 
Daniel Santos 

On November 7, 2016 12:45:51 PM GMT+00:00, Ufuk Celebi <uc...@apache.org> wrote:
>On 7 November 2016 at 13:06:16, Daniel Santos (dsantos@cryptolab.net)
>wrote:
>> I believe the job won't start from the last savepoint is that
>correct,
>> on versions ( > 1.2 ), it will start afresh ?
>
>Yes, with 1.2 you will be able to take a savepoint and then resume from
>that savepoint with different parallelism. :-)
>
>\u2013 Ufuk

Re: Flink Time Window State

Posted by Ufuk Celebi <uc...@apache.org>.
On 7 November 2016 at 13:06:16, Daniel Santos (dsantos@cryptolab.net) wrote:
> I believe the job won't start from the last savepoint is that correct,
> on versions ( > 1.2 ), it will start afresh ?

Yes, with 1.2 you will be able to take a savepoint and then resume from that savepoint with different parallelism. :-)

– Ufuk



Re: Flink Time Window State

Posted by Daniel Santos <ds...@cryptolab.net>.
Hi,

Thank you very much.

I see. Ok it makes sense.

I believe there is kinda catch with parallelism.

Say one does a savepoint and then it changes the parallelism.

I believe the job won't start from the last savepoint is that correct, 
on versions ( > 1.2 ), it will start afresh ?

Best Regards,
Daniel Santos

On 11/04/2016 05:54 PM, Aljoscha Krettek wrote:
> Hi,
> the state of the window is kept by the WindowOperator (which uses the 
> state descriptor you mentioned to access the state). The FoldFunction 
> does not itself keep the state but is only used to update the state 
> inside the WindowOperator, if you will.
>
> When you say restart, are you talking about stopping the job manually 
> and then restarting? In that case I expect the state to be reset. 
> Flink will perform checkpoints of the state so that it can recover in 
> case of failures, these checkpoints, however, don't survive stopping a 
> job. If you want to persist the state across stopping/restarting you 
> should look into save points: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html
>
> Cheers,
> Aljoscha
>
> On Fri, 4 Nov 2016 at 16:40 Daniel Santos <dsantos@cryptolab.net 
> <ma...@cryptolab.net>> wrote:
>
>
>     Hello Aljoscha,
>
>     Thank you for your reply.
>
>     But I believe, reading from the docs, that any user function has
>     to be a Rich Function, if it wishes to have state.
>     Now any Rich Function cannot be used or accepted on a Window.
>
>     For instances looking at flink source version 1.1.3 which is the
>     one I'm currently using, on the class WindowedStream.java we find
>     the following snippet.
>
>     "
>         public <R> SingleOutputStreamOperator<R> apply(R initialValue,
>     FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W>
>     function, TypeInformation<R> resultType) {
>             if (foldFunction instanceof RichFunction) {
>                 throw new UnsupportedOperationException("FoldFunction
>     of apply can not be a RichFunction.");
>             }
>             if (windowAssigner instanceof MergingWindowAssigner) {
>                 throw new UnsupportedOperationException("Fold cannot
>     be used with a merging WindowAssigner.");
>             }
>     ...
>     "
>
>     Now I can see that window operator creates a FoldDescriptor, as
>     you have said it uses the APIs you have described.
>     However I can't see how everything fits.
>     For instances the Count class here described which can only extend
>     a FoldFunction and not a RichFoldFunction, how does flink keeps
>     track of the accumulator ?
>     Because from my tests it seems that it does not.
>
>     Everytime the program/stream/job is restart the accumulator start
>     from the Initial Value.
>
>     Kind Regards,
>     Daniel Santos
>
>
>     On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
>>     Hi Daniel,
>>     Flink will checkpoint the state of all operations (in your case
>>     to HDFS). Flink has several APIs for dealing with state in user
>>     functions:
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The
>>     window operator also internally uses these APIs.
>>
>>     Let me know if you need anything more.
>>
>>     Cheers,
>>     Aljoscha
>>
>>     On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsantos@cryptolab.net
>>     <ma...@cryptolab.net>> wrote:
>>
>>         Hello,
>>
>>         I have some question that has been bugging me.
>>         Let's say we have a Kafka Source.
>>         Checkpoint is enabled, with a period of 5 seconds.
>>         We have a FSBackend ( Hadoop ).
>>
>>         Now imagine we have a window a tumbling of 10 Minutes.
>>
>>         For simplicity we are going to say that we are counting all
>>         elements
>>         arrinving in 10 Minutes. Something like this.
>>
>>         class Count extends FoldFunction[Event, Long] {
>>
>>             override def fold(accumulator: Long, value: Event): Long =  {
>>               accumulator + 1
>>             }
>>
>>         }
>>
>>         So we have
>>
>>         source.
>>               window(<Tumbling>).
>>               apply(0, Count(), WindowFunction())
>>
>>         In the first 2 Minutes arrives 10 events, then we stop the
>>         stream/task/job or it fails and then it is restarted, what
>>         will be the
>>         state of the fold function ?
>>         Will it be 10 and it will resume from there ? Or will it be 0 ?
>>
>>         It is kinda important to know because imagine we have a
>>         Window of 1 day.
>>         And the job fails mid day. How will it resume ?
>>
>>         Best Regards
>>
>


Re: Flink Time Window State

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the state of the window is kept by the WindowOperator (which uses the state
descriptor you mentioned to access the state). The FoldFunction does not
itself keep the state but is only used to update the state inside the
WindowOperator, if you will.

When you say restart, are you talking about stopping the job manually and
then restarting? In that case I expect the state to be reset. Flink will
perform checkpoints of the state so that it can recover in case of
failures, these checkpoints, however, don't survive stopping a job. If you
want to persist the state across stopping/restarting you should look into
save points:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html

Cheers,
Aljoscha

On Fri, 4 Nov 2016 at 16:40 Daniel Santos <ds...@cryptolab.net> wrote:

>
> Hello Aljoscha,
>
> Thank you for your reply.
>
> But I believe, reading from the docs, that any user function has to be a
> Rich Function, if it wishes to have state.
> Now any Rich Function cannot be used or accepted on a Window.
>
> For instances looking at flink source version 1.1.3 which is the one I'm
> currently using, on the class WindowedStream.java we find the following
> snippet.
>
> "
>     public <R> SingleOutputStreamOperator<R> apply(R initialValue,
> FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
> TypeInformation<R> resultType) {
>         if (foldFunction instanceof RichFunction) {
>             throw new UnsupportedOperationException("FoldFunction of apply
> can not be a RichFunction.");
>         }
>         if (windowAssigner instanceof MergingWindowAssigner) {
>             throw new UnsupportedOperationException("Fold cannot be used
> with a merging WindowAssigner.");
>         }
> ...
> "
>
> Now I can see that window operator creates a FoldDescriptor, as you have
> said it uses the APIs you have described.
> However I can't see how everything fits.
> For instances the Count class here described which can only extend a
> FoldFunction and not a RichFoldFunction, how does flink keeps track of the
> accumulator ?
> Because from my tests it seems that it does not.
>
> Everytime the program/stream/job is restart the accumulator start from the
> Initial Value.
>
> Kind Regards,
> Daniel Santos
>
>
> On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
>
> Hi Daniel,
> Flink will checkpoint the state of all operations (in your case to HDFS).
> Flink has several APIs for dealing with state in user functions:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The
> window operator also internally uses these APIs.
>
> Let me know if you need anything more.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 19:43 Daniel Santos <ds...@cryptolab.net> wrote:
>
> Hello,
>
> I have some question that has been bugging me.
> Let's say we have a Kafka Source.
> Checkpoint is enabled, with a period of 5 seconds.
> We have a FSBackend ( Hadoop ).
>
> Now imagine we have a window a tumbling of 10 Minutes.
>
> For simplicity we are going to say that we are counting all elements
> arrinving in 10 Minutes. Something like this.
>
> class Count extends FoldFunction[Event, Long] {
>
>     override def fold(accumulator: Long, value: Event): Long =  {
>       accumulator + 1
>     }
>
> }
>
> So we have
>
> source.
>       window(<Tumbling>).
>       apply(0, Count(), WindowFunction())
>
> In the first 2 Minutes arrives 10 events, then we stop the
> stream/task/job or it fails and then it is restarted, what will be the
> state of the fold function ?
> Will it be 10 and it will resume from there ? Or will it be 0 ?
>
> It is kinda important to know because imagine we have a Window of 1 day.
> And the job fails mid day. How will it resume ?
>
> Best Regards
>
>
>

Re: Flink Time Window State

Posted by Daniel Santos <ds...@cryptolab.net>.
Hello Aljoscha,

Thank you for your reply.

But I believe, reading from the docs, that any user function has to be a 
Rich Function, if it wishes to have state.
Now any Rich Function cannot be used or accepted on a Window.

For instances looking at flink source version 1.1.3 which is the one I'm 
currently using, on the class WindowedStream.java we find the following 
snippet.

"
     public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, 
TypeInformation<R> resultType) {
         if (foldFunction instanceof RichFunction) {
             throw new UnsupportedOperationException("FoldFunction of 
apply can not be a RichFunction.");
         }
         if (windowAssigner instanceof MergingWindowAssigner) {
             throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
         }
...
"

Now I can see that window operator creates a FoldDescriptor, as you have 
said it uses the APIs you have described.
However I can't see how everything fits.
For instances the Count class here described which can only extend a 
FoldFunction and not a RichFoldFunction, how does flink keeps track of 
the accumulator ?
Because from my tests it seems that it does not.

Everytime the program/stream/job is restart the accumulator start from 
the Initial Value.

Kind Regards,
Daniel Santos


On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
> Hi Daniel,
> Flink will checkpoint the state of all operations (in your case to 
> HDFS). Flink has several APIs for dealing with state in user 
> functions: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The 
> window operator also internally uses these APIs.
>
> Let me know if you need anything more.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsantos@cryptolab.net 
> <ma...@cryptolab.net>> wrote:
>
>     Hello,
>
>     I have some question that has been bugging me.
>     Let's say we have a Kafka Source.
>     Checkpoint is enabled, with a period of 5 seconds.
>     We have a FSBackend ( Hadoop ).
>
>     Now imagine we have a window a tumbling of 10 Minutes.
>
>     For simplicity we are going to say that we are counting all elements
>     arrinving in 10 Minutes. Something like this.
>
>     class Count extends FoldFunction[Event, Long] {
>
>         override def fold(accumulator: Long, value: Event): Long =  {
>           accumulator + 1
>         }
>
>     }
>
>     So we have
>
>     source.
>           window(<Tumbling>).
>           apply(0, Count(), WindowFunction())
>
>     In the first 2 Minutes arrives 10 events, then we stop the
>     stream/task/job or it fails and then it is restarted, what will be the
>     state of the fold function ?
>     Will it be 10 and it will resume from there ? Or will it be 0 ?
>
>     It is kinda important to know because imagine we have a Window of
>     1 day.
>     And the job fails mid day. How will it resume ?
>
>     Best Regards
>


Re: Flink Time Window State

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Daniel,
Flink will checkpoint the state of all operations (in your case to HDFS).
Flink has several APIs for dealing with state in user functions:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The
window operator also internally uses these APIs.

Let me know if you need anything more.

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 19:43 Daniel Santos <ds...@cryptolab.net> wrote:

> Hello,
>
> I have some question that has been bugging me.
> Let's say we have a Kafka Source.
> Checkpoint is enabled, with a period of 5 seconds.
> We have a FSBackend ( Hadoop ).
>
> Now imagine we have a window a tumbling of 10 Minutes.
>
> For simplicity we are going to say that we are counting all elements
> arrinving in 10 Minutes. Something like this.
>
> class Count extends FoldFunction[Event, Long] {
>
>     override def fold(accumulator: Long, value: Event): Long =  {
>       accumulator + 1
>     }
>
> }
>
> So we have
>
> source.
>       window(<Tumbling>).
>       apply(0, Count(), WindowFunction())
>
> In the first 2 Minutes arrives 10 events, then we stop the
> stream/task/job or it fails and then it is restarted, what will be the
> state of the fold function ?
> Will it be 10 and it will resume from there ? Or will it be 0 ?
>
> It is kinda important to know because imagine we have a Window of 1 day.
> And the job fails mid day. How will it resume ?
>
> Best Regards
>
>