You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jérémy Albrecht <ja...@skapane.ai> on 2021/09/20 15:22:37 UTC

Python statefun - Context update

Hi all !

I am currently developing an environment that uses Flink Stateful functions in Python. The architecture is complex but here are the main points that needs to be understood to frame the problem I am facing.
* functions.py contains several methods, one of them is handling protobuf messages and storing into context.storage a ValueSpec which is a python Object. This object contains classical types such as natives types, but also lists of named tuple and objects.
Since pickle is not able to serialize objects and functions, I use dill to serialize this object and make a Flink-compatible type. 
* The object stored in the Context can be resumed as this:
@dataclass
class RunState:
    duration: int
    runner: Runner
and runner is also an instance of the class Runner, defined as:
class Runner:
    counter: int = 0
    
    def prepare_smth(self):
        self.counter = 10
        
        def add_to_counter():
            self.counter += 1
       
       return add_to_counter

From the object stored in the state of the Flink function I so have access to the function definition *add_to_counter* and I can do something like:
method_caller = ctx.storage.run_state.runner.prepare_smth()
method_caller()

-> from now on I expect the attribute counter from the Runner instance defined as runner in the Run State to have the value 11. What happens is that the value is modified inside the context of add_to_counter and prepare_smth, but the change is never reflected inside the object stored in the context. ctx.storage.state.runner.counter still equals to 0. 

* After some research with the debugger, in my opinion, the program tries to update the value but ctx.storage seems to be only updatable when I do a reassignement: 
e.g. ctx.storage.run_state.counter += 1 has NO effect, but 
rs = ctx.storage.run_state
rs.counter +=1
ctx.storage.run_state = rs has the expected result.

If you have any clue about how to start or what to do about my problem I greatly appreciate the help !!
Don't hesitate to ask me if anything is unclear :)

Thanks,
Jérémy
    

Re: Python statefun - Context update

Posted by Nicolaus Weidner <ni...@ververica.com>.
Hi Jérémy,

objects are serialized when you store them in state. So when you retrieve
run_state from state, it is deserialized and you have a fresh instance.
Calling method_caller() then modifies this instance, but *not *the
serialized version stored in state.
In the second attempt you described, you modified the retrieved instance,
then stored the modified version in state (or rather a serialized form of
it) - which works as expected.

I don't think there is any way around explicitly storing any local changes
you made in state.

Best,
Nico

On Mon, Sep 20, 2021 at 5:22 PM Jérémy Albrecht <ja...@skapane.ai>
wrote:

> Hi all !
>
> I am currently developing an environment that uses Flink Stateful
> functions in Python. The architecture is complex but here are the main
> points that needs to be understood to frame the problem I am facing.
> * functions.py contains several methods, one of them is handling protobuf
> messages and storing into context.storage a ValueSpec which is a python
> Object. This object contains classical types such as natives types, but
> also lists of named tuple and objects.
> Since pickle is not able to serialize objects and functions, I use dill to
> serialize this object and make a Flink-compatible type.
> * The object stored in the Context can be resumed as this:
> @dataclass
> class RunState:
>     duration: int
>     runner: Runner
> and runner is also an instance of the class Runner, defined as:
> class Runner:
>     counter: int = 0
>
>     def prepare_smth(self):
>         self.counter = 10
>
>         def add_to_counter():
>             self.counter += 1
>
>        return add_to_counter
>
> From the object stored in the state of the Flink function I so have access
> to the function definition *add_to_counter* and I can do something like:
> method_caller = ctx.storage.run_state.runner.prepare_smth()
> method_caller()
>
> -> from now on I expect the attribute counter from the Runner instance
> defined as runner in the Run State to have the value 11. What happens is
> that the value is modified inside the context of add_to_counter and
> prepare_smth, but the change is never reflected inside the object stored in
> the context. ctx.storage.state.runner.counter still equals to 0.
>
> * After some research with the debugger, in my opinion, the program tries
> to update the value but ctx.storage seems to be only updatable when I do a
> reassignement:
> e.g. ctx.storage.run_state.counter += 1 has NO effect, but
> rs = ctx.storage.run_state
> rs.counter +=1
> ctx.storage.run_state = rs has the expected result.
>
> If you have any clue about how to start or what to do about my problem I
> greatly appreciate the help !!
> Don't hesitate to ask me if anything is unclear :)
>
> Thanks,
> Jérémy
>
>