You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Sam McVeety <sg...@google.com.INVALID> on 2017/01/19 18:22:02 UTC

Runner-provided ValueProviders

Hi folks, I'm looking for feedback on whether the following is a reasonable
approach to handling ValueProviders that are intended to be populated at
runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from the
service).  Two potential pieces of a solution:

1. Annotate such parameters with @RunnerProvided, which results in an
Exception if the user manually tries to set the parameter.

2. Allow for a DefaultValueFactory to be present for the set of Runners
that do not override the parameter.

Best,
Sam

Re: Runner-provided ValueProviders

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
Note that the Runner API makes the problem and solution cross-runner, but
the problem exists separately for each runner today. So a near-term
solution makes sense as long as it is forward-looking (broadcasting
runner-provided info to user fns) or has a TTL (putting extra stuff into
BoundedSource).

I also don't love the PipelineOptions-as-broadcast-parameters but I foresee
that we will eventually benefit from a generic method along these lines. I
wouldn't want to rush into anything that we can't change. This should
probably be designed first in the Fn API (the runner sending metadata at
some level of granularity to the SDK harness) and then the SDK translating
that data idiomatically to the user Fn.

As for the alternative of adding a context to selected BoundedSource
methods, I get behind it with the strong caveat that my support is only
because it is already "deprecated". We have explicitly moved away from this
approach in DoFn and it is only because of time constraints that we not
done the same for CombineFn. Parameter buckets are an OK way to allow
backwards compatible (for pipeline authors, not runner authors) extension
of parameter lists in interfaces that users implement, but we have
developed a much better way. (at the generic level of the Fn API such
distinctions don't exist; this point is largely about the Java SDK itself)

With SplittableDoFn, all sources and sinks will be DoFn(s) so the
extensibility of DoFn applies and is better than a context / kwargs-like
parameter bucket. This allows all proposed solutions to easily coexist
while being invisible in pipeline when not required, but trivially
discoverable by runner authors pre Fn API (post Fn API runner authors will
ship the parameters generically and the SDK will deliver it in a way that
the runner neither knows nor cares about)

If this is not explicitly intended as a short term thing to get us through
until we have SDF, then we should instead consider upgrading BoundedSource
to be extensible a la DoFn (which might also look a lot like just starting
to support SplittableDoFn). But I think that will take longer.

Kenn

On Fri, Jan 20, 2017 at 11:35 AM, Dan Halperin <dh...@google.com.invalid>
wrote:

> I think this was initially motivated by BEAM-758
> <https://issues.apache.org/jira/browse/BEAM-758>. Copying from that issue:
>
>     In the forthcoming runner API, a user will be able to save a pipeline
> to JSON and then run it repeatedly.
>
>     Many pieces of code (e.g., BigQueryIO.Read or Write) rely on a single
> random value (nonce). These values are typically generated at pipeline
> construction time (in PTransform#expand), so that they are deterministic
> (don't change across retries of DoFns) and global (are the same across all
> workers).
>
>     However, once the runner API lands the existing code would result in
> the same nonce being reused across jobs, which breaks BigQueryIO. Other
> possible solutions:
>        * Generate nonce in Create(1) | ParDo then use this as a side input.
> Should work, as along as side inputs are actually checkpointed. But does
> not work for BoundedSource, which cannot accept side inputs.
>        * If a nonce is only needed for the lifetime of one bundle, can be
> generated in startBundle and used in processElement/finishBundle/tearDown.
>        * Add some context somewhere that lets user code access unique step
> name, and somehow generate a nonce consistently e.g. by hashing. Will
> usually work, but this is similarly not available to sources.
>
> I believe your proposal is to add such a nonce to the root PipelineOptions
> object -- perhaps, `String getRunNonce()` or something like that. This
> would let us have a different nonce for every Pipeline.run() call, but it
> would add the requirement to runners that they must populate it.
>
> My 2c: This would be an easy change for runners and unblocks the issue, but
> it complicates the demand on runner authors. Longer-term, plumbing a
> context into places like BoundedSource and providing the value there is a
> better idea.
>
> Dan
>
> On Fri, Jan 20, 2017 at 11:30 AM, Davor Bonaci <da...@apache.org> wrote:
>
> > Expecting runners to populate, or override, SDK-level pipeline options
> > isn't a great thing, particularly in a scenario that would affect
> > correctness.
> >
> > The main thing is discoverability of a subtle API like this -- there's
> > little chance somebody writing a new runner would stumble across this and
> > do the right thing. It would be much better to make expectations from a
> > runner clear, say, via a runner-provided "context" API. I'd stay away
> from
> > a pipeline option with a default value.
> >
> > The other contentions topic here is the usage of a job-level or
> > execution-level identifier. This easily becomes ambiguous in the presence
> > of Flink's savepoints, Dataflow's update, fast re-execution, canary vs.
> > production pipeline, cross-job optimizations, etc. I think we'd be better
> > off with a transform-level nonce than a job-level one.
> >
> > Finally, the real solution is to enhance the model and make such a
> > functionality available to everyone, e.g., roughly "init" + "checkpoint"
> +
> > "side-input to source / splittabledofn / composable io".
> >
> > --
> >
> > Practically, to solve the problem at hand quickly, I'd be in favor of a
> > context-based approach.
> >
> > On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <sg...@google.com.invalid>
> > wrote:
> >
> > > Hi folks, I'm looking for feedback on whether the following is a
> > reasonable
> > > approach to handling ValueProviders that are intended to be populated
> at
> > > runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from
> > the
> > > service).  Two potential pieces of a solution:
> > >
> > > 1. Annotate such parameters with @RunnerProvided, which results in an
> > > Exception if the user manually tries to set the parameter.
> > >
> > > 2. Allow for a DefaultValueFactory to be present for the set of Runners
> > > that do not override the parameter.
> > >
> > > Best,
> > > Sam
> > >
> >
>

Re: Runner-provided ValueProviders

Posted by Dan Halperin <dh...@google.com.INVALID>.
I think this was initially motivated by BEAM-758
<https://issues.apache.org/jira/browse/BEAM-758>. Copying from that issue:

    In the forthcoming runner API, a user will be able to save a pipeline
to JSON and then run it repeatedly.

    Many pieces of code (e.g., BigQueryIO.Read or Write) rely on a single
random value (nonce). These values are typically generated at pipeline
construction time (in PTransform#expand), so that they are deterministic
(don't change across retries of DoFns) and global (are the same across all
workers).

    However, once the runner API lands the existing code would result in
the same nonce being reused across jobs, which breaks BigQueryIO. Other
possible solutions:
       * Generate nonce in Create(1) | ParDo then use this as a side input.
Should work, as along as side inputs are actually checkpointed. But does
not work for BoundedSource, which cannot accept side inputs.
       * If a nonce is only needed for the lifetime of one bundle, can be
generated in startBundle and used in processElement/finishBundle/tearDown.
       * Add some context somewhere that lets user code access unique step
name, and somehow generate a nonce consistently e.g. by hashing. Will
usually work, but this is similarly not available to sources.

I believe your proposal is to add such a nonce to the root PipelineOptions
object -- perhaps, `String getRunNonce()` or something like that. This
would let us have a different nonce for every Pipeline.run() call, but it
would add the requirement to runners that they must populate it.

My 2c: This would be an easy change for runners and unblocks the issue, but
it complicates the demand on runner authors. Longer-term, plumbing a
context into places like BoundedSource and providing the value there is a
better idea.

Dan

On Fri, Jan 20, 2017 at 11:30 AM, Davor Bonaci <da...@apache.org> wrote:

> Expecting runners to populate, or override, SDK-level pipeline options
> isn't a great thing, particularly in a scenario that would affect
> correctness.
>
> The main thing is discoverability of a subtle API like this -- there's
> little chance somebody writing a new runner would stumble across this and
> do the right thing. It would be much better to make expectations from a
> runner clear, say, via a runner-provided "context" API. I'd stay away from
> a pipeline option with a default value.
>
> The other contentions topic here is the usage of a job-level or
> execution-level identifier. This easily becomes ambiguous in the presence
> of Flink's savepoints, Dataflow's update, fast re-execution, canary vs.
> production pipeline, cross-job optimizations, etc. I think we'd be better
> off with a transform-level nonce than a job-level one.
>
> Finally, the real solution is to enhance the model and make such a
> functionality available to everyone, e.g., roughly "init" + "checkpoint" +
> "side-input to source / splittabledofn / composable io".
>
> --
>
> Practically, to solve the problem at hand quickly, I'd be in favor of a
> context-based approach.
>
> On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <sg...@google.com.invalid>
> wrote:
>
> > Hi folks, I'm looking for feedback on whether the following is a
> reasonable
> > approach to handling ValueProviders that are intended to be populated at
> > runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from
> the
> > service).  Two potential pieces of a solution:
> >
> > 1. Annotate such parameters with @RunnerProvided, which results in an
> > Exception if the user manually tries to set the parameter.
> >
> > 2. Allow for a DefaultValueFactory to be present for the set of Runners
> > that do not override the parameter.
> >
> > Best,
> > Sam
> >
>

Re: Runner-provided ValueProviders

Posted by Davor Bonaci <da...@apache.org>.
Expecting runners to populate, or override, SDK-level pipeline options
isn't a great thing, particularly in a scenario that would affect
correctness.

The main thing is discoverability of a subtle API like this -- there's
little chance somebody writing a new runner would stumble across this and
do the right thing. It would be much better to make expectations from a
runner clear, say, via a runner-provided "context" API. I'd stay away from
a pipeline option with a default value.

The other contentions topic here is the usage of a job-level or
execution-level identifier. This easily becomes ambiguous in the presence
of Flink's savepoints, Dataflow's update, fast re-execution, canary vs.
production pipeline, cross-job optimizations, etc. I think we'd be better
off with a transform-level nonce than a job-level one.

Finally, the real solution is to enhance the model and make such a
functionality available to everyone, e.g., roughly "init" + "checkpoint" +
"side-input to source / splittabledofn / composable io".

--

Practically, to solve the problem at hand quickly, I'd be in favor of a
context-based approach.

On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <sg...@google.com.invalid>
wrote:

> Hi folks, I'm looking for feedback on whether the following is a reasonable
> approach to handling ValueProviders that are intended to be populated at
> runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from the
> service).  Two potential pieces of a solution:
>
> 1. Annotate such parameters with @RunnerProvided, which results in an
> Exception if the user manually tries to set the parameter.
>
> 2. Allow for a DefaultValueFactory to be present for the set of Runners
> that do not override the parameter.
>
> Best,
> Sam
>