You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Ertl <pe...@gmx.net> on 2017/08/03 10:11:14 UTC
state inside functions
Hi,
can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ?
I see these two possibilies:
(1) initialize a non-transient property from the constructor
(2) initialize a transient property inside a Rich???Function when open(ConfigurationParameters) is invoked
on what criteria should I choose (1) or (2) ?
how is this related to checkpointing / rebalancing?
Thanks in advance
Peter
Re: state inside functions
Posted by Timo Walther <tw...@apache.org>.
Hi,
if you would like to dynamically adjust configuration of your streaming
job, it might be a good approach to consider the configuration as a
stream itself.
The connect() API can be used to connect a main stream with a control
stream. In any case the configuration should be persisted in state if it
should be present after restore. Otherwise, you need to implement a
logic where the operator must query the latest configuration from some
external system which could become the bottleneck.
Regards,
Timo
On 16.12.20 22:07, vishalovercome wrote:
> When running in HA mode or taking savepoints, if we pass configuration as
> constructor arguments, then it seems as though changing configuration at a
> later time doesn't work as it uses state to restore older configuration. How
> can we pass configuration while having the flexibility to change the values
> at a later date?
>
> I've started another discussion with many more questions -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
Re: state inside functions
Posted by vishalovercome <vi...@moengage.com>.
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the values
at a later date?
I've started another discussion with many more questions -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: state inside functions
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Peter,
function objects (such as an instance of a class that extends MapFunction)
that are used to construct a plan are serialized using Java serialization
and shipped to the workers for execution.
Therefore, function classes must be Serializable. In general it is
recommended to configure function objects via the constructor. However, if
you have a member property that does not implement Serializable, you should
use a RichFunction, make the property transient, and initialize it in
open().
Alternatively, you can also override Java's serialization/deserialization
methods and implement custom de/serialization logic.
Best, Fabian
2017-08-03 16:00 GMT+02:00 Nico Kruber <ni...@data-artisans.com>:
> Hi Peter,
> there's no need to worry about transient members as the operator itself is
> not
> serialized - only the state itself, depending on the state back-end.
>
> If you want your state to be recovered by checkpoints you should implement
> the
> open() method and initialise your state there as in your point (2) and as
> described in [1].
>
> If you want to re-scale your job, you have to take a savepoint and may
> resume
> from there with a different parallelism [2] but be sure to set a maximum
> parallelism (per job / or operator) and set UUIDs for operators as
> described
> in [3].
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
> savepoints.html
> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
> production_ready.html
>
> On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> > Hi,
> >
> > can someone elaborate on when I should set properties transient /
> > non-transient within operators (e.g. map / flatMap / reduce) ?
> >
> > I see these two possibilies:
> >
> > (1) initialize a non-transient property from the constructor
> > (2) initialize a transient property inside a Rich???Function when
> > open(ConfigurationParameters) is invoked
> >
> > on what criteria should I choose (1) or (2) ?
> >
> > how is this related to checkpointing / rebalancing?
> >
> > Thanks in advance
> > Peter
>
>
Re: state inside functions
Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Peter,
there's no need to worry about transient members as the operator itself is not
serialized - only the state itself, depending on the state back-end.
If you want your state to be recovered by checkpoints you should implement the
open() method and initialise your state there as in your point (2) and as
described in [1].
If you want to re-scale your job, you have to take a savepoint and may resume
from there with a different parallelism [2] but be sure to set a maximum
parallelism (per job / or operator) and set UUIDs for operators as described
in [3].
Nico
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
savepoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
production_ready.html
On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> Hi,
>
> can someone elaborate on when I should set properties transient /
> non-transient within operators (e.g. map / flatMap / reduce) ?
>
> I see these two possibilies:
>
> (1) initialize a non-transient property from the constructor
> (2) initialize a transient property inside a Rich???Function when
> open(ConfigurationParameters) is invoked
>
> on what criteria should I choose (1) or (2) ?
>
> how is this related to checkpointing / rebalancing?
>
> Thanks in advance
> Peter