You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2018/03/19 23:49:14 UTC

ListCheckpointed function - what happens prior to restoreState() being called?

Hi all,

If I implement ListCheckpointed in a function, is there a guarantee that open() is called before restoreState()?

Asking because it doesn’t seem to be the case, and I didn’t notice this being described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html>

If so, then is it normally best practice to also implement the CheckpointedFunction interface, so that initializeState() method is called before the restore?

In Flink test code, I don’t see this typically being done.

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: ListCheckpointed function - what happens prior to restoreState() being called?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

ListCheckpointed.restoreState() is part of the operator state
initialization, but you are right that it is not explicitly mentioned in
the docs.
That documentation page is rather about the internals and does not directly
like to public API.

I think you can also initialize the variable in restoreState() if you don't
want to do it in the constructor. That method is always called when the
operator is started.

Best,
Fabian

2018-03-21 19:55 GMT+01:00 Ken Krugler <kk...@transpac.com>:

> Hi Fabian,
>
> On Mar 20, 2018, at 6:38 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Ken,
>
> The documentation page describes that first the state is restored /
> initialized and then the function's open() method is called [1].
>
>
> Yes, thanks - my question was about ListCheckpointed.restoreState(),
> which doesn’t seem to be described on that page.
>
> I had a look at the code and it looks like the docs are correct [2]
>
>
> Thanks for the gentle push to look at the code :)
>
> I see that in https://github.com/apache/flink/blob/master/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/util/functions/
> StreamingFunctionUtils.java#L173, if CheckpointedFunction is implemented
> then initializeState() is called, otherwise if ListCheckpointed is
> implemented then restoreState() is called. Essentially they are equivalent,
> and it’s a one or the other.
>
> So the net-net is that I need to make sure my state variables are
> allocated in my constructor, not in my open method.
>
> What tripped me up is that my state variables are transient, as I don’t
> want them serialized. Years of Cascading coding has made this a reflex
> action, where you then set up all of your transient variables in the open()
> call.
>
> But that’s not correct in this case. So they have to be allocated in the
> constructor.
>
> Regards,
>
> — Ken
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/internals/task_lifecycle.html#normal-execution
> [2] https://github.com/apache/flink/blob/master/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/
> runtime/tasks/StreamTask.java#L296
>
> 2018-03-20 0:49 GMT+01:00 Ken Krugler <kk...@transpac.com>:
>
>> Hi all,
>>
>> If I implement ListCheckpointed in a function, is there a guarantee that
>> open() is called before restoreState()?
>>
>> Asking because it doesn’t seem to be the case, and I didn’t notice this
>> being described here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> internals/task_lifecycle.html
>>
>> If so, then is it normally best practice to also implement
>> the CheckpointedFunction interface, so that initializeState() method is
>> called before the restore?
>>
>> In Flink test code, I don’t see this typically being done.
>>
>> Thanks,
>>
>> — Ken
>>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378 <(530)%20210-6378>
>
>

Re: ListCheckpointed function - what happens prior to restoreState() being called?

Posted by Ken Krugler <kk...@transpac.com>.
Hi Fabian,

> On Mar 20, 2018, at 6:38 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ken,
> 
> The documentation page describes that first the state is restored / initialized and then the function's open() method is called [1].

Yes, thanks - my question was about ListCheckpointed.restoreState(), which doesn’t seem to be described on that page.

> I had a look at the code and it looks like the docs are correct [2]

Thanks for the gentle push to look at the code :)

I see that in https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java#L173 <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java#L173>, if CheckpointedFunction is implemented then initializeState() is called, otherwise if ListCheckpointed is implemented then restoreState() is called. Essentially they are equivalent, and it’s a one or the other.

So the net-net is that I need to make sure my state variables are allocated in my constructor, not in my open method.

What tripped me up is that my state variables are transient, as I don’t want them serialized. Years of Cascading coding has made this a reflex action, where you then set up all of your transient variables in the open() call.

But that’s not correct in this case. So they have to be allocated in the constructor.

Regards,

— Ken

> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html#normal-execution <https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html#normal-execution>
> [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L296 <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L296>
> 
> 2018-03-20 0:49 GMT+01:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
> Hi all,
> 
> If I implement ListCheckpointed in a function, is there a guarantee that open() is called before restoreState()?
> 
> Asking because it doesn’t seem to be the case, and I didn’t notice this being described here:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html>
> 
> If so, then is it normally best practice to also implement the CheckpointedFunction interface, so that initializeState() method is called before the restore?
> 
> In Flink test code, I don’t see this typically being done.
> 
> Thanks,
> 
> — Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378


Re: ListCheckpointed function - what happens prior to restoreState() being called?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

The documentation page describes that first the state is restored /
initialized and then the function's open() method is called [1].
I had a look at the code and it looks like the docs are correct [2]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html#normal-execution
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L296

2018-03-20 0:49 GMT+01:00 Ken Krugler <kk...@transpac.com>:

> Hi all,
>
> If I implement ListCheckpointed in a function, is there a guarantee that
> open() is called before restoreState()?
>
> Asking because it doesn’t seem to be the case, and I didn’t notice this
> being described here:
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/internals/task_lifecycle.html
>
> If so, then is it normally best practice to also implement
> the CheckpointedFunction interface, so that initializeState() method is
> called before the restore?
>
> In Flink test code, I don’t see this typically being done.
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>