You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Romain Manni-Bucau <rm...@gmail.com> on 2018/04/26 12:50:09 UTC

Re: DoFn thread constraint.

Hi John,

The fact a runner caches a fn per thread is an internal implementation
detail but a fn will only be activated by one thread max at a time (like
stateless or any object pool).
This means your fn can fail.


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-04-26 14:37 GMT+02:00 John MacMillan <jo...@ca.ibm.com>:

> I'm trying to understand how restrictive this sentence from
> 4.3.2 of the Programming Guide
> <https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility>
> is for our runner:
>
> Each instance of your function object is accessed by a single thread on a
> worker instance, unless you explicitly create your own threads.
>
> Does this imply that a runner may not ever use a DoFn instance on more
> than one thread (ie. it is not allowed to move the instance from one thread
> to another but must create a separate instance for each thread), or only
> that an instance would only ever be used by a single thread at a time?
>
> Or put another way, would the following DoFn (or something like it, if
> I've messed up) ever be allowed to throw on a properly implemented runner?
>
> public class MotionDetector<T> extends DoFn<T,T> {
>     private static transient ThreadLocal<Long> id;
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>         if (id == null) {
>             id = new ThreadLocal<Long>() {
>                     @Override
>                     protected Long initialValue() {
>                         return Thread.currentThread().getId();
>                     }
>                 };
>         } else if (!id.get().equals(Thread.currentThread().getId())) {
>             throw new RuntimeException("Moved!");
>         }
>         c.output(c.element());
>     }
> }
>
> Thanks,
> John
>
>

Re: DoFn thread constraint.

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Do you want to give it a try doing a PR on
https://github.com/apache/beam-site? Better to fix an issue by the person
getting the issue in general in that area ;)


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-04-26 15:01 GMT+02:00 John MacMillan <jo...@ca.ibm.com>:

> Thank you for your clarification!
>
> May I suggest that clarification also make its way into the doc? We had
> some internal disagreement on reading the current text. :-)
>
> John
>
>
> ----- Original message -----
> From: Romain Manni-Bucau <rm...@gmail.com>
> To: dev@beam.apache.org
> Cc:
> Subject: Re: DoFn thread constraint.
> Date: Thu, Apr 26, 2018 8:53 AM
>
> Hi John,
>
> The fact a runner caches a fn per thread is an internal implementation
> detail but a fn will only be activated by one thread max at a time (like
> stateless or any object pool).
> This means your fn can fail.
>
>
> Romain Manni-Bucau
> @rmannibucau
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_rmannibucau&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=7n8vzZzMysB-d3UQ5dhXedf-7rCKxvCR99T55cloLSI&e=>
> |  Blog
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__rmannibucau.metawerx.net_&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=5CnT2KuZbQTszj-6-67T7mer7HmhIF13WfR5MfqzoBg&e=> |
> Old Blog
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__rmannibucau.wordpress.com&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=3Q5eL5XgoJjiQsi_a1iW5hZ0t4OtW3TCAF11CU2M5Ng&e=>
> | Github
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_rmannibucau&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=hvp_uQKsfo5od8jcjAIj23zoPhd1kpJxZzEbcQGGVng&e=> |
> LinkedIn
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.linkedin.com_in_rmannibucau&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=fqvGgWW6w-c4n-wxZ9JYF0aRcqONUF-ZpM_ecBaZJqA&e=> |
> Book
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.packtpub.com_application-2Ddevelopment_java-2Dee-2D8-2Dhigh-2Dperformance&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=EcvQJgvb8REVXMAboNgnQ0kQM_HcEYH4rtnKHiS4-BQ&e=>
>
> 2018-04-26 14:37 GMT+02:00 John MacMillan <jo...@ca.ibm.com>:
>
> I'm trying to understand how restrictive this sentence from
> 4.3.2 of the Programming Guide
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23user-2Dcode-2Dthread-2Dcompatibility&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=hyvb4btKJ4uggtvnd1z1a8vWI7DQHmH8ZC8Q7J834lY&m=40PCJQ0SrkyMQAA4BXzel5qm0AOoNRVoDvWgGQ9ATYc&s=wDdCrFraEAE9PQsGE5g_Eqb4tj2jx1nePyMQl45Wugw&e=>
> is for our runner:
>
> Each instance of your function object is accessed by a single thread on a
> worker instance, unless you explicitly create your own threads.
>
> Does this imply that a runner may not ever use a DoFn instance on more
> than one thread (ie. it is not allowed to move the instance from one thread
> to another but must create a separate instance for each thread), or only
> that an instance would only ever be used by a single thread at a time?
>
> Or put another way, would the following DoFn (or something like it, if
> I've messed up) ever be allowed to throw on a properly implemented runner?
>
> public class MotionDetector<T> extends DoFn<T,T> {
>     private static transient ThreadLocal<Long> id;
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>         if (id == null) {
>             id = new ThreadLocal<Long>() {
>                     @Override
>                     protected Long initialValue() {
>                         return Thread.currentThread().getId();
>                     }
>                 };
>         } else if (!id.get().equals(Thread.currentThread().getId())) {
>             throw new RuntimeException("Moved!");
>         }
>         c.output(c.element());
>     }
> }
>
> Thanks,
> John
>
>
>
>