You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joey Echeverria <je...@splunk.com> on 2020/05/06 17:22:38 UTC

Re: Rich Function Thread Safety

I’ve seen a few mailing list posts (including this one) that say Flink guarantees there is no concurrent access to operator methods (e.g. flatMap, snapshotState, etc.) and thus synchronization isn’t needed when writing operators that support checkpointing. I was trying to find a place in the official docs where this was called out, but was coming up empty.

Is there a section of the docs that covers this topic?

Thanks!

-Joey

On Dec 18, 2019, at 9:38 PM, Zhu Zhu <re...@gmail.com>> wrote:

[--- This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe. ---]

Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with the user function.

Thanks,
Zhu Zhu

Aaron Langford <aa...@gmail.com>> 于2019年12月19日周四 上午11:25写道:
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going to have a bad time

I don't think this could happen given that checkpoints effectively are messages in the pipeline, and the checkpoint is only taken when an operator sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron


Re: Rich Function Thread Safety

Posted by tao xiao <xi...@gmail.com>.
As the java doc suggests it seems operator method and snapshot checkpoint
are accessed by two different threads

https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62

On Thu, May 7, 2020 at 1:22 AM Joey Echeverria <je...@splunk.com>
wrote:

> I’ve seen a few mailing list posts (including this one) that say Flink
> guarantees there is no concurrent access to operator methods (e.g. flatMap,
> snapshotState, etc.) and thus synchronization isn’t needed when writing
> operators that support checkpointing. I was trying to find a place in the
> official docs where this was called out, but was coming up empty.
>
> Is there a section of the docs that covers this topic?
>
> Thanks!
>
> -Joey
>
> On Dec 18, 2019, at 9:38 PM, Zhu Zhu <re...@gmail.com> wrote:
>
> [--- This email originated from outside of the organization. Do not click
> links or open attachments unless you recognize the sender and know the
> content is safe. ---]
>
> Hi Aaron,
>
> It is thread safe since the state snapshot happens in the same thread with
> the user function.
>
> Thanks,
> Zhu Zhu
>
> Aaron Langford <aa...@gmail.com> 于2019年12月19日周四 上午11:25写道:
>
>> Hello Flink Community,
>>
>> I'm hoping to verify some understanding:
>>
>> If I have a function with managed state, I'm wondering if a
>> checkpoint will ever be taken while a function is mutating state. I'll try
>> to illustrate the situation I'm hoping to be safe from:
>>
>> Happy Path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> set B to 10
>> t3 -> function returns
>>
>> Unhappy path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
>> t3 -> set B to 10
>> t4 -> function returns
>> ...
>> tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
>> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're
>> going to have a bad time
>>
>> I don't think this could happen given that checkpoints effectively are
>> messages in the pipeline, and the checkpoint is only taken when an operator
>> sees the checkpoint barrier.
>>
>> Hoping to make sure this is correct!
>>
>> Aaron
>>
>
>

-- 
Regards,
Tao

Re: Rich Function Thread Safety

Posted by Igal Shilman <ig...@ververica.com>.
Hi Lian,

Good to hear that you are learning about StateFun, and I'd be happy to
answer any of your questions while doing so :-)
Perhaps in the future it would be best if you start a new email thread, so
that it would be easier to spot your question.

The following is completely thread safe:

final int seen = count.getOrDefault(0);
count.set(seen + 1);

The simple reason is that the functions are invoked one by one on a single
OS thread, and different OS threads
do not share function instances between them. In addition each OS thread
would own a chunk of keys that only it can invoke.

2. Is there any scenario that the developers need to worry about
> process/thread safety when using state?

Few things here:

* do not share mutable static variables without synchronization.

* try to minimize/avoid doing long blocking calls. Use asynchronous
API if applicable.


3. can I consider stateful functions as Flink operators so that all
> operator related theories can be applied to stateful functions?
>

Absolutely yes. StateFun is built on-top of the DataStream API + some
internal bits.


> 4. Similarly, can we apply all theories of DataStream state to stateFun's
> state?

I'm not sure what do you mean by that, but at large yes. The main
difference would be that

We don't support state evolution with arbitrary state types, but
strictly require Protocol Buffers for that.


Good luck,
Igal.

On Sun, Oct 25, 2020 at 7:43 PM Lian Jiang <ji...@gmail.com> wrote:

> Hi,
>
> I am learning
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/getting-started/java_walkthrough.html
> and wondering if the invoke function is thread safe for:
>
> final int seen = count.getOrDefault(0);count.set(seen + 1);
>
> From https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/logical.html
>
> "When an application starts, each parallel worker of the framework will create one physical object per function type."
>
> It sounds like one function can be invoked by multiple workers at the same time. The tutorial example
>
> indicates that the persistedValue can be process safe (cross multiple workers) and thread safe (inside
>
> a worker, e.g. timer callback).
>
>
> Could you please add some clarification on the questions below?
>
> 1. What's the design (briefly) for persisted state process/thread safety?
> 2. Is there any scenario that the developers need to worry about process/thread safety when using state?
>
> 3. can I consider stateful functions as Flink operators so that all operator related theories can be applied to stateful functions?
>
> 4. Similarly, can we apply all theories of DataStream state to stateFun's state?
>
> Appreciate very much!
>
>
> Thanks
>
> Lian
>
>
> On Sun, May 10, 2020 at 9:33 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> As others have mentioned already, it is true that method calls on
>> operators
>> (e.g. processing events and snapshotting state) will not concurrently
>> happen.
>>
>> As for your findings in reading through the documentation, that might be a
>> hint that we could add a bit more explanation mentioning this.
>> Could you suggest where you'd probably expect to see this being mentioned,
>> based on your readt-hrough?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>

Re: Rich Function Thread Safety

Posted by Lian Jiang <ji...@gmail.com>.
Hi,

I am learning
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/getting-started/java_walkthrough.html
and wondering if the invoke function is thread safe for:

final int seen = count.getOrDefault(0);count.set(seen + 1);

From https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/logical.html

"When an application starts, each parallel worker of the framework
will create one physical object per function type."

It sounds like one function can be invoked by multiple workers at the
same time. The tutorial example

indicates that the persistedValue can be process safe (cross multiple
workers) and thread safe (inside

a worker, e.g. timer callback).


Could you please add some clarification on the questions below?

1. What's the design (briefly) for persisted state process/thread safety?
2. Is there any scenario that the developers need to worry about
process/thread safety when using state?

3. can I consider stateful functions as Flink operators so that all
operator related theories can be applied to stateful functions?

4. Similarly, can we apply all theories of DataStream state to stateFun's state?

Appreciate very much!


Thanks

Lian


On Sun, May 10, 2020 at 9:33 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> As others have mentioned already, it is true that method calls on operators
> (e.g. processing events and snapshotting state) will not concurrently
> happen.
>
> As for your findings in reading through the documentation, that might be a
> hint that we could add a bit more explanation mentioning this.
> Could you suggest where you'd probably expect to see this being mentioned,
> based on your readt-hrough?
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Rich Function Thread Safety

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/