You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2020/06/30 20:58:59 UTC

Re: Exploding windows and FnApiDoFnRunner

Just to close this thread. This one is addressed now via
https://issues.apache.org/jira/browse/BEAM-10303
Thanks Luke for taking care.


On Mon, May 4, 2020 at 9:31 PM Luke Cwik <lc...@google.com> wrote:
>
> Kenn, the optimization is not complex, just never done.
>
> The FnApiDoFnRunner was rewritten to be designed with portability first and to move away from the assumptions that were baked into the existing DoFn "runner" implementations and the constructs used in the non-portable implementation. There are many DoFn "runner" implementations that exist in Java that are layered on top of each other to handle several special cases which are also used by "system" DoFns as well.
>
> On Mon, May 4, 2020 at 10:38 AM Robert Burke <ro...@frantil.com> wrote:
>>
>> Ack ok. Thank you for clarifying!
>>  Confirming that Kenn is right, the optimization is pretty much that simple. [1] is where it's done in the Go SDK
>>
>> [1] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L136
>>
>> On Mon, May 4, 2020, 10:18 AM Reuven Lax <re...@google.com> wrote:
>>>
>>> I wonder how often we even implement this optimization today. If the processElement has an OutputReceiver parameter then we mark it as observesWindow, and that's a pretty common parameter.
>>>
>>> Arguably this is a bug in our implementation of OutputReceiver though - it should be able to copy all the windows into the output element.
>>>
>>> Reuven
>>>
>>> On Mon, May 4, 2020 at 9:37 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>> Is the optimization complex in the Fn API context? In non-Fn API it is basically "if (observesWindow) { explode } else { don't }" [1]. The DoFn signature tells you everything you need. This might be a good first commit for someone looking to contribute to the Java SDK harness?
>>>>
>>>> Kenn
>>>>
>>>> [1] https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L133
>>>>
>>>> On Mon, May 4, 2020 at 9:33 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> In Python we only explode windows if the Window is being inspected.
>>>>> (There is no separate "DoFnRunner" for FnApi vs. Legacy execution.)
>>>>>
>>>>> On Mon, May 4, 2020 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>> >
>>>>> > Reuven you are correct that the optimization has yet to be implemented.
>>>>> > Robert the FnApiDoFnRunner is the name of a Java class that executes Java DoFns in the Java SDK harness. The poor name choice is my fault.
>>>>> >
>>>>> > On Fri, May 1, 2020 at 9:14 PM Reuven Lax <re...@google.com> wrote:
>>>>> >>
>>>>> >> FnApiDoFnRunner does run Java DoFns.
>>>>> >>
>>>>> >> On Fri, May 1, 2020 at 9:10 PM Robert Burke <ro...@frantil.com> wrote:
>>>>> >>>
>>>>> >>> In the Go SDK this optimization is handled on the SDK side, inthe pardo execution node not one the runner side of the FnAPI
>>>>> >>>
>>>>> >>> But i think I'm about to learn that FnApiDoFnRunner is something that runs on the Java SDK side rather than on the runner side, despite the name.
>>>>> >>>
>>>>> >>> On Fri, May 1, 2020, 9:02 PM Reuven Lax <re...@google.com> wrote:
>>>>> >>>>
>>>>> >>>> Ah - so we don't implement the optimization of not expanding the windows if not necessary?
>>>>> >>>>
>>>>> >>>> On Fri, May 1, 2020 at 8:56 PM Luke Cwik <lc...@google.com> wrote:
>>>>> >>>>>
>>>>> >>>>> In all the processElementYYY methods the currentWindow is assigned as can be seen here as we loop over the set of windows:
>>>>> >>>>> https://github.com/apache/beam/blob/9bb2990c0f6c08dd33d9c6fa1fd91842c644a8e3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L738
>>>>> >>>>>
>>>>> >>>>> On Fri, May 1, 2020 at 8:51 PM Reuven Lax <re...@google.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>> In Beam a WindowedValue can can contain multiple windows, because an element can be in multiple windows at once (for example, sliding windows). Usually we keep these elements unexpanded, but if the user's doFn observes the window  then we have to "explode" the element out, and we run the process function once per window. e.g. if the process function looks like this
>>>>> >>>>>>
>>>>> >>>>>> @ProcessElement
>>>>> >>>>>> public void process(@Element T e, IntervalWindow w)
>>>>> >>>>>>
>>>>> >>>>>> In SimpleDoFnRunner we do this inside processElement. However I can't find the equivalent code in FnApiDoFnRunner. How does window explosion work in the portable runner?
>>>>> >>>>>>
>>>>> >>>>>> Reuven