You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/09/07 18:58:19 UTC

PR/6343: Adding support for MustFollow

A contributor opened a PR[1] to add support for a PTransform that forces
one PTransform to be executed before another by using side input readiness
as a way to defer execution.

They have provided this example usage:
# Ensure that output dir is created before attempting to write output files.
output_dir_ready = pipeline | beam.Create([output_path]) | CreateOutputDir()
output_pcoll | MustFollow(output_dir_ready) | WriteOutput()

The example the PR author provided works since they are working in the
global window with no triggers that have early firings and they are only
processing a single element so `CreateOutputDir()` is invoked only once.
The unused side input access forces all runners to wait till
`CreateOutputDir()` is invoked before `WriteOutput()`.

Now imagine a user wanted to use `MustFollow` but incorrectly setup the
trigger for the side input and it has an early firing (such as an after
count 1 trigger) and has multiple directories they want to create. The side
input would become ready before **all** of `CreateOutputDir()` was invoked.
This would mean that `MustFollow` would be able to access the side input
and hence allow `WriteOutput()` to happen. The example above is still a
bounded pipeline and a runner could choose to execute it as I described.

The contract for side inputs is that the side input PCollection must have
at least one firing based upon the upstream windowing strategy for the
"requested" window or the runner must be sure that the no such window could
ever be produced before it can be accessed by a consumer.

My concern with the PR is two fold:
1) I'm not sure if this works for all runners in both bounded and unbounded
pipelines and could use insights from others:

I believe if the user is using a trigger which only fires once and that
they guarantee that the main input window mapping to side input window
mapping is 1:1 then they are likely to get the expected behavior,
WriteOutput() always will happen once CreateOutputDir() for a given window
has finished.

2) If the solution works, the documentation around the limitations of how
the PTransform works needs a lot more detail and potentially "error"
checking of the windowing strategy.

1: https://github.com/apache/beam/pull/6343

Re: PR/6343: Adding support for MustFollow

Posted by Maximilian Michels <mx...@apache.org>.
This is a great idea but I share Lukasz' doubts about this being a 
universal solution for awaiting some action in a pipeline.

I wonder, wouldn't it work to not pass in a PCollection, but instead 
wrap a DoFn which internally ensures the correct triggering behavior? 
All runners which correctly materialize the side input with the first 
window triggering should support it correctly.

Apart from that, couldn't you simply use the @Setup method of a DoFn in 
your example?

-Max

On 07.09.18 23:12, Peter Li wrote:
> Thanks!  I (PR author) agree with all that.
> 
> On the unbounded triggering issue, I can see 2 reasonable desired behaviors:
>    1) The collection to follow is bounded and the intent is to wait for 
> the entire collection to be processed.
>    2) The collection to follow has windows that in some flexible sense 
> align with the windows of the collection that is supposed to be waiting, 
> and the intent is for the waiting to apply within window.
> 
> If either or both of these behaviors can be supported by something like 
> the proposed mechanism, then I think that's a reasonable thing to have 
> provided it's well documented.
> 
> The other high-level issue I see is whether this should be A) a free 
> PTransform on collections, or B) something like a call on ParDo.  In the 
> PR it's (A), but I can imagine for future more native implementation on 
> some runners it could be more natural as (B).
> 
> On Fri, Sep 7, 2018 at 11:58 AM Lukasz Cwik <lcwik@google.com 
> <ma...@google.com>> wrote:
> 
>     A contributor opened a PR[1] to add support for a PTransform that
>     forces one PTransform to be executed before another by using side
>     input readiness as a way to defer execution.
> 
>     They have provided this example usage:
>     # Ensure that output dir is created before attempting to write
>     output files.
>     output_dir_ready = pipeline | beam.Create([output_path]) |
>     CreateOutputDir()
>     output_pcoll | MustFollow(output_dir_ready) | WriteOutput()
> 
>     The example the PR author provided works since they are working in
>     the global window with no triggers that have early firings and they
>     are only processing a single element so `CreateOutputDir()` is
>     invoked only once. The unused side input access forces all runners
>     to wait till `CreateOutputDir()` is invoked before `WriteOutput()`.
> 
>     Now imagine a user wanted to use `MustFollow` but incorrectly setup
>     the trigger for the side input and it has an early firing (such as
>     an after count 1 trigger) and has multiple directories they want to
>     create. The side input would become ready before **all** of
>     `CreateOutputDir()` was invoked. This would mean that `MustFollow`
>     would be able to access the side input and hence allow
>     `WriteOutput()` to happen. The example above is still a bounded
>     pipeline and a runner could choose to execute it as I described.
> 
>     The contract for side inputs is that the side input PCollection must
>     have at least one firing based upon the upstream windowing strategy
>     for the "requested" window or the runner must be sure that the no
>     such window could ever be produced before it can be accessed by a
>     consumer.
> 
>     My concern with the PR is two fold:
>     1) I'm not sure if this works for all runners in both bounded and
>     unbounded pipelines and could use insights from others:
> 
>     I believe if the user is using a trigger which only fires once and
>     that they guarantee that the main input window mapping to side input
>     window mapping is 1:1 then they are likely to get the expected
>     behavior, WriteOutput() always will happen once CreateOutputDir()
>     for a given window has finished.
> 
>     2) If the solution works, the documentation around the limitations
>     of how the PTransform works needs a lot more detail and potentially
>     "error" checking of the windowing strategy.
> 
>     1: https://github.com/apache/beam/pull/6343
> 

Re: PR/6343: Adding support for MustFollow

Posted by Peter Li <ph...@google.com>.
Thanks!  I (PR author) agree with all that.

On the unbounded triggering issue, I can see 2 reasonable desired behaviors:
  1) The collection to follow is bounded and the intent is to wait for the
entire collection to be processed.
  2) The collection to follow has windows that in some flexible sense align
with the windows of the collection that is supposed to be waiting, and the
intent is for the waiting to apply within window.

If either or both of these behaviors can be supported by something like the
proposed mechanism, then I think that's a reasonable thing to have provided
it's well documented.

The other high-level issue I see is whether this should be A) a free
PTransform on collections, or B) something like a call on ParDo.  In the PR
it's (A), but I can imagine for future more native implementation on some
runners it could be more natural as (B).

On Fri, Sep 7, 2018 at 11:58 AM Lukasz Cwik <lc...@google.com> wrote:

> A contributor opened a PR[1] to add support for a PTransform that forces
> one PTransform to be executed before another by using side input readiness
> as a way to defer execution.
>
> They have provided this example usage:
> # Ensure that output dir is created before attempting to write output
> files.
> output_dir_ready = pipeline | beam.Create([output_path]) |
> CreateOutputDir()
> output_pcoll | MustFollow(output_dir_ready) | WriteOutput()
>
> The example the PR author provided works since they are working in the
> global window with no triggers that have early firings and they are only
> processing a single element so `CreateOutputDir()` is invoked only once.
> The unused side input access forces all runners to wait till
> `CreateOutputDir()` is invoked before `WriteOutput()`.
>
> Now imagine a user wanted to use `MustFollow` but incorrectly setup the
> trigger for the side input and it has an early firing (such as an after
> count 1 trigger) and has multiple directories they want to create. The side
> input would become ready before **all** of `CreateOutputDir()` was invoked.
> This would mean that `MustFollow` would be able to access the side input
> and hence allow `WriteOutput()` to happen. The example above is still a
> bounded pipeline and a runner could choose to execute it as I described.
>
> The contract for side inputs is that the side input PCollection must have
> at least one firing based upon the upstream windowing strategy for the
> "requested" window or the runner must be sure that the no such window could
> ever be produced before it can be accessed by a consumer.
>
> My concern with the PR is two fold:
> 1) I'm not sure if this works for all runners in both bounded and
> unbounded pipelines and could use insights from others:
>
> I believe if the user is using a trigger which only fires once and that
> they guarantee that the main input window mapping to side input window
> mapping is 1:1 then they are likely to get the expected behavior,
> WriteOutput() always will happen once CreateOutputDir() for a given window
> has finished.
>
> 2) If the solution works, the documentation around the limitations of how
> the PTransform works needs a lot more detail and potentially "error"
> checking of the windowing strategy.
>
> 1: https://github.com/apache/beam/pull/6343
>