You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Ertl <pe...@gmx.net> on 2017/08/03 10:11:14 UTC

state inside functions

Hi,

can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ?

I see these two possibilies:

(1) initialize a non-transient property from the constructor
(2) initialize a transient property inside a Rich???Function when open(ConfigurationParameters) is invoked

on what criteria should I choose (1) or (2) ?

how is this related to checkpointing / rebalancing?

Thanks in advance
Peter

Re: state inside functions

Posted by Timo Walther <tw...@apache.org>.
Hi,

if you would like to dynamically adjust configuration of your streaming 
job, it might be a good approach to consider the configuration as a 
stream itself.

The connect() API can be used to connect a main stream with a control 
stream. In any case the configuration should be persisted in state if it 
should be present after restore. Otherwise, you need to implement a 
logic where the operator must query the latest configuration from some 
external system which could become the bottleneck.

Regards,
Timo


On 16.12.20 22:07, vishalovercome wrote:
> When running in HA mode or taking savepoints, if we pass configuration as
> constructor arguments, then it seems as though changing configuration at a
> later time doesn't work as it uses state to restore older configuration. How
> can we pass configuration while having the flexibility to change the values
> at a later date?
> 
> I've started another discussion with many more questions -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 


Re: state inside functions

Posted by vishalovercome <vi...@moengage.com>.
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the values
at a later date?

I've started another discussion with many more questions -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: state inside functions

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Peter,

function objects (such as an instance of a class that extends MapFunction)
that are used to construct a plan are serialized using Java serialization
and shipped to the workers for execution.
Therefore, function classes must be Serializable. In general it is
recommended to configure function objects via the constructor. However, if
you have a member property that does not implement Serializable, you should
use a RichFunction, make the property transient, and initialize it in
open().

Alternatively, you can also override Java's serialization/deserialization
methods and implement custom de/serialization logic.

Best, Fabian



2017-08-03 16:00 GMT+02:00 Nico Kruber <ni...@data-artisans.com>:

> Hi Peter,
> there's no need to worry about transient members as the operator itself is
> not
> serialized - only the state itself, depending on the state back-end.
>
> If you want your state to be recovered by checkpoints you should implement
> the
> open() method and initialise your state there as in your point (2) and as
> described in [1].
>
> If you want to re-scale your job, you have to take a savepoint and may
> resume
> from there with a different parallelism [2] but be sure to set a maximum
> parallelism (per job / or operator) and set UUIDs for operators as
> described
> in [3].
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
> savepoints.html
> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
> production_ready.html
>
> On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> > Hi,
> >
> > can someone elaborate on when I should set properties transient /
> > non-transient within operators (e.g. map / flatMap / reduce) ?
> >
> > I see these two possibilies:
> >
> > (1) initialize a non-transient property from the constructor
> > (2) initialize a transient property inside a Rich???Function when
> > open(ConfigurationParameters) is invoked
> >
> > on what criteria should I choose (1) or (2) ?
> >
> > how is this related to checkpointing / rebalancing?
> >
> > Thanks in advance
> > Peter
>
>

Re: state inside functions

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Peter,
there's no need to worry about transient members as the operator itself is not 
serialized - only the state itself, depending on the state back-end.

If you want your state to be recovered by checkpoints you should implement the 
open() method and initialise your state there as in your point (2) and as 
described in [1].

If you want to re-scale your job, you have to take a savepoint and may resume 
from there with a different parallelism [2] but be sure to set a maximum 
parallelism (per job / or operator) and set UUIDs for operators as described 
in [3].


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
savepoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
production_ready.html

On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> Hi,
> 
> can someone elaborate on when I should set properties transient /
> non-transient within operators (e.g. map / flatMap / reduce) ?
> 
> I see these two possibilies:
> 
> (1) initialize a non-transient property from the constructor
> (2) initialize a transient property inside a Rich???Function when
> open(ConfigurationParameters) is invoked
> 
> on what criteria should I choose (1) or (2) ?
> 
> how is this related to checkpointing / rebalancing?
> 
> Thanks in advance
> Peter