You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by gaurav mishra <ga...@gmail.com> on 2022/02/18 23:07:03 UTC

ValueState read and write behavior[Dataflow]

Hi,
I have a stateful DoFn where I am storing a pojo. in my processElement I am
doing something like this -

Pojo a = state.read();
.....
....
a.setField1(123)
....
end of function

As you can see I don't have a state.write(a);

when I read that statespec in my onTimer() method I get the correct value.

Pojo a = state.read();
log.info(a.getField1()) // 123

I would imagine that state.read() and state.write() are both network calls.
Without the .write() call statespec should have had stale data.
is my understanding correct wrong here wrt to read() and write() calls.
I was expecting log.info(a.getField1()) to print some old value here.

Re: ValueState read and write behavior[Dataflow]

Posted by gaurav mishra <ga...@gmail.com>.
Thanks Luke for the explanation. That helps.

On Fri, Feb 18, 2022 at 3:58 PM Luke Cwik <lc...@google.com> wrote:

> Yes, the POJO instance is stored in memory in a cache associated with a
> cache key within the process. As long as bundles are being processed
> successfully, the runner keeps giving the worker the same cache key so that
> you will keep getting back the same POJO instance. If a bundle fails to
> process or the runner moves the work to another worker then it knows to
> change this cache key effectively invalidating the entry. Eventually the
> entry will be evicted since the cache is LRU based.
>
> This[1] goes into a bunch of details of how this is done when using the
> portability APIs but a lot of this is the same pre-portability using method
> calls between the runner owned portion and the SDK owned portion of the
> worker.
>
> 1:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>
> On Fri, Feb 18, 2022 at 3:38 PM gaurav mishra <
> gauravmishra.itbhu@gmail.com> wrote:
>
>>
>>
>> On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> You are most likely mutating an object stored in a cache and by luck the
>>> object is still cached when the next bundle starts processing.
>>>
>>> Anything you read from state should not be mutated without the
>>> corresponding write call.
>>> Anything you write to state should not be mutated after the call to
>>> write.
>>>
>>
>> That is what my understanding was too. I am currently debugging an issue
>> in our production where weird things are happening and that kind of code is
>> present in that pipeline. Above code sample is a simplified version of my
>> code which I was debugging. When I ran that code from local I was expecting
>> that bug to reproduce all the time but that "luck" element is giving me a
>> hard time.
>>
>> So the takeaway I guess is that I should add those .write() calls at the
>> end of processElement once all mutation is done.
>>
>> "You are most likely mutating an object stored in a cache and by luck the
>> object is still cached when the next bundle starts processing."
>> Can you please elaborate a bit more here, you mean that the Pojo instance
>> after is kept alive across calls to processElement?
>>
>>
>>> Apache Beam could guard against both of these cases but it would require
>>> giving a copy during read() (reduces performance) or and making a copy
>>> during write() (increases memory usage until the writes are materialized).
>>>
>>>
>>> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
>>> gauravmishra.itbhu@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I have a stateful DoFn where I am storing a pojo. in my processElement
>>>> I am doing something like this -
>>>>
>>>> Pojo a = state.read();
>>>> .....
>>>> ....
>>>> a.setField1(123)
>>>> ....
>>>> end of function
>>>>
>>>> As you can see I don't have a state.write(a);
>>>>
>>>> when I read that statespec in my onTimer() method I get the correct
>>>> value.
>>>>
>>>> Pojo a = state.read();
>>>> log.info(a.getField1()) // 123
>>>>
>>>> I would imagine that state.read() and state.write() are both network
>>>> calls. Without the .write() call statespec should have had stale data.
>>>> is my understanding correct wrong here wrt to read() and write() calls.
>>>> I was expecting log.info(a.getField1()) to print some old value here.
>>>>
>>>>
>>>>

Re: ValueState read and write behavior[Dataflow]

Posted by Luke Cwik <lc...@google.com>.
Yes, the POJO instance is stored in memory in a cache associated with a
cache key within the process. As long as bundles are being processed
successfully, the runner keeps giving the worker the same cache key so that
you will keep getting back the same POJO instance. If a bundle fails to
process or the runner moves the work to another worker then it knows to
change this cache key effectively invalidating the entry. Eventually the
entry will be evicted since the cache is LRU based.

This[1] goes into a bunch of details of how this is done when using the
portability APIs but a lot of this is the same pre-portability using method
calls between the runner owned portion and the SDK owned portion of the
worker.

1:
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m

On Fri, Feb 18, 2022 at 3:38 PM gaurav mishra <ga...@gmail.com>
wrote:

>
>
> On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <lc...@google.com> wrote:
>
>> You are most likely mutating an object stored in a cache and by luck the
>> object is still cached when the next bundle starts processing.
>>
>> Anything you read from state should not be mutated without the
>> corresponding write call.
>> Anything you write to state should not be mutated after the call to write.
>>
>
> That is what my understanding was too. I am currently debugging an issue
> in our production where weird things are happening and that kind of code is
> present in that pipeline. Above code sample is a simplified version of my
> code which I was debugging. When I ran that code from local I was expecting
> that bug to reproduce all the time but that "luck" element is giving me a
> hard time.
>
> So the takeaway I guess is that I should add those .write() calls at the
> end of processElement once all mutation is done.
>
> "You are most likely mutating an object stored in a cache and by luck the
> object is still cached when the next bundle starts processing."
> Can you please elaborate a bit more here, you mean that the Pojo instance
> after is kept alive across calls to processElement?
>
>
>> Apache Beam could guard against both of these cases but it would require
>> giving a copy during read() (reduces performance) or and making a copy
>> during write() (increases memory usage until the writes are materialized).
>>
>>
>> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
>> gauravmishra.itbhu@gmail.com> wrote:
>>
>>> Hi,
>>> I have a stateful DoFn where I am storing a pojo. in my processElement I
>>> am doing something like this -
>>>
>>> Pojo a = state.read();
>>> .....
>>> ....
>>> a.setField1(123)
>>> ....
>>> end of function
>>>
>>> As you can see I don't have a state.write(a);
>>>
>>> when I read that statespec in my onTimer() method I get the correct
>>> value.
>>>
>>> Pojo a = state.read();
>>> log.info(a.getField1()) // 123
>>>
>>> I would imagine that state.read() and state.write() are both network
>>> calls. Without the .write() call statespec should have had stale data.
>>> is my understanding correct wrong here wrt to read() and write() calls.
>>> I was expecting log.info(a.getField1()) to print some old value here.
>>>
>>>
>>>

Re: ValueState read and write behavior[Dataflow]

Posted by gaurav mishra <ga...@gmail.com>.
On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <lc...@google.com> wrote:

> You are most likely mutating an object stored in a cache and by luck the
> object is still cached when the next bundle starts processing.
>
> Anything you read from state should not be mutated without the
> corresponding write call.
> Anything you write to state should not be mutated after the call to write.
>

That is what my understanding was too. I am currently debugging an issue in
our production where weird things are happening and that kind of code is
present in that pipeline. Above code sample is a simplified version of my
code which I was debugging. When I ran that code from local I was expecting
that bug to reproduce all the time but that "luck" element is giving me a
hard time.

So the takeaway I guess is that I should add those .write() calls at the
end of processElement once all mutation is done.

"You are most likely mutating an object stored in a cache and by luck the
object is still cached when the next bundle starts processing."
Can you please elaborate a bit more here, you mean that the Pojo instance
after is kept alive across calls to processElement?


> Apache Beam could guard against both of these cases but it would require
> giving a copy during read() (reduces performance) or and making a copy
> during write() (increases memory usage until the writes are materialized).
>
>
> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
> gauravmishra.itbhu@gmail.com> wrote:
>
>> Hi,
>> I have a stateful DoFn where I am storing a pojo. in my processElement I
>> am doing something like this -
>>
>> Pojo a = state.read();
>> .....
>> ....
>> a.setField1(123)
>> ....
>> end of function
>>
>> As you can see I don't have a state.write(a);
>>
>> when I read that statespec in my onTimer() method I get the correct value.
>>
>> Pojo a = state.read();
>> log.info(a.getField1()) // 123
>>
>> I would imagine that state.read() and state.write() are both network
>> calls. Without the .write() call statespec should have had stale data.
>> is my understanding correct wrong here wrt to read() and write() calls.
>> I was expecting log.info(a.getField1()) to print some old value here.
>>
>>
>>

Re: ValueState read and write behavior[Dataflow]

Posted by Luke Cwik <lc...@google.com>.
You are most likely mutating an object stored in a cache and by luck the
object is still cached when the next bundle starts processing.

Anything you read from state should not be mutated without the
corresponding write call.
Anything you write to state should not be mutated after the call to write.

Apache Beam could guard against both of these cases but it would require
giving a copy during read() (reduces performance) or and making a copy
during write() (increases memory usage until the writes are materialized).


On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <ga...@gmail.com>
wrote:

> Hi,
> I have a stateful DoFn where I am storing a pojo. in my processElement I
> am doing something like this -
>
> Pojo a = state.read();
> .....
> ....
> a.setField1(123)
> ....
> end of function
>
> As you can see I don't have a state.write(a);
>
> when I read that statespec in my onTimer() method I get the correct value.
>
> Pojo a = state.read();
> log.info(a.getField1()) // 123
>
> I would imagine that state.read() and state.write() are both network
> calls. Without the .write() call statespec should have had stale data.
> is my understanding correct wrong here wrt to read() and write() calls.
> I was expecting log.info(a.getField1()) to print some old value here.
>
>
>

Re: ValueState read and write behavior[Dataflow]

Posted by gaurav mishra <ga...@gmail.com>.
there is some code which runs when processElement gets called for first
time for any given key


if(state.read() == null)
{
    state.write(new Pojo());
}

rest of the code
...
Pojo a = state.read();


On Fri, Feb 18, 2022 at 3:24 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
> gauravmishra.itbhu@gmail.com> wrote:
>
>> Hi,
>> I have a stateful DoFn where I am storing a pojo. in my processElement I
>> am doing something like this -
>>
>> Pojo a = state.read();
>>
> This will return null if the state has never been written. When do you
> write the state?
>
>> .....
>> ....
>> a.setField1(123)
>> ....
>> end of function
>>
>> As you can see I don't have a state.write(a);
>>
>> when I read that statespec in my onTimer() method I get the correct value.
>>
>> Pojo a = state.read();
>> log.info(a.getField1()) // 123
>>
>> I would imagine that state.read() and state.write() are both network
>> calls. Without the .write() call statespec should have had stale data.
>> is my understanding correct wrong here wrt to read() and write() calls.
>> I was expecting log.info(a.getField1()) to print some old value here.
>>
>>
>>

Re: ValueState read and write behavior[Dataflow]

Posted by Reuven Lax <re...@google.com>.
On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <ga...@gmail.com>
wrote:

> Hi,
> I have a stateful DoFn where I am storing a pojo. in my processElement I
> am doing something like this -
>
> Pojo a = state.read();
>
This will return null if the state has never been written. When do you
write the state?

> .....
> ....
> a.setField1(123)
> ....
> end of function
>
> As you can see I don't have a state.write(a);
>
> when I read that statespec in my onTimer() method I get the correct value.
>
> Pojo a = state.read();
> log.info(a.getField1()) // 123
>
> I would imagine that state.read() and state.write() are both network
> calls. Without the .write() call statespec should have had stale data.
> is my understanding correct wrong here wrt to read() and write() calls.
> I was expecting log.info(a.getField1()) to print some old value here.
>
>
>