You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2020/02/10 23:55:11 UTC

FnAPI proto backwards compatibility

With an eye towards cross-language (which includes cross-version)
pipelines and services (specifically looking at Dataflow) supporting
portable pipelines, there's been a desire to stabilize the portability
protos. There are currently many cleanups we'd like to do [1] (some
essential, others nice to have); are there others that people would
like to see?

Of course we would like it to be possible for the FnAPI and Beam
itself to continue to evolve. Most of this can be handled by runners
understanding various transform URNs, but not all. (An example that
comes to mind is support for large iterables [2], or the requirement
to observe and respect new fields on a PTransform or its payloads
[3]). One proposal for this is to add capabilities and/or
requirements. An environment (corresponding generally to an SDK) could
adveritize various capabilities (as a list or map of URNs) which a
runner can take advantage of without requiring all SDKs to support all
features at the same time. For the other way around, we need a way of
marking something that a runner must reject if it does not understand
it. This could be a set of requirements (again, a list of map of URNs)
that designate capabilities required to at least be understood by the
runner to faithfully execute this pipeline. (These could be attached
to a transform or the pipeline itself.) Do these sound like reasonable
additions? Also, would they ever need to be parameterized (map), or
would a list suffice?

[1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
[2] https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
[3] https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E

Re: FnAPI proto backwards compatibility

Posted by Jan Lukavský <je...@seznam.cz>.
Cool, thanks. :-)

Jan

On 2/14/20 11:35 PM, Robert Bradshaw wrote:
> Oh, sorry. Try it again
> https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit?usp=sharing
>
> On Fri, Feb 14, 2020 at 2:04 PM Jan Lukavský <je...@seznam.cz> wrote:
>> Hi Robert,
>>
>> the doc seems to be locked.
>>
>> Jan
>>
>> On 2/14/20 10:56 PM, Robert Bradshaw wrote:
>>> I created https://github.com/apache/beam/pull/10873 to add the minimal
>>> set of fields to the existing protos, and also created
>>> https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
>>> to start enumerating some of the URNs we may want to have. It would be
>>> a good milestone to get this in by the release next week.
>>>
>>> On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <kc...@google.com> wrote:
>>>>> we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities
>>>> +1, I think the potential to formally integrate our idea of compatibility and unit testing is a big advantage of this proposal. Also, when deciding where to draw lines between different URNs, it may help to look at the existing validates runner test categories, which are currently the most accurate signal we have regarding a runner's capabilities.
>>>>
>>>> On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:
>>>>> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an Urn for requiring something or it's an annotation for saying the DoFn provides something (eg. Provides K-anonymization with k defined)
>>>>>
>>>>> The general theme of this thread seems to be trying to ensure a runner can reject a pipeline if it's not able to provide the right guarantees, so that latter case isn't handled.
>>>>>
>>>>> Eg. The latter provisions could be used to analyze a pipeline to ensure the outputs are all properly anonymized to a certain degree at construction time.
>>>>>
>>>>> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>
>>>>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>>>>>>
>>>>>>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>>>>>>
>>>>>>>    a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>>>>>>
>>>>>>>    b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>>>>> I think even if these runners keep their execution not using portability, they should migrate to use the portable pipeline definition. Then they can share the same model w/ runners that execute using portability. The Fn API is not required to be used as long as the runner implements the semantics of the pipeline.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>>    c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>>>>>
>>>>>>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>>>>>>
>>>>>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>>>>>>>>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>>>>>>>
>>>>>>>>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>>>>>>>
>>>>>>>>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>>>>>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>>>>>>> of common operations/well known DoFns that often occur on opposite
>>>>>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>>>>>> are commonly supported that could be grouped under these meta-urns.
>>>>>>>>
>>>>>>>> Note that these need not be monotonic, for example a current v1 might
>>>>>>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>>>>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>>>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>>>>>
>>>>>>>> Probably makes sense to defer adding such super-urns until we notice a
>>>>>>>> set that is commonly used together in practice.
>>>>>>>>
>>>>>>>> Of course there's still value in SDKs being able to support features
>>>>>>>> piecemeal as well, which is the big reason we're avoiding a simple
>>>>>>>> monotonically-increasing version number.
>>>>>>>>
>>>>>>>>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>>>>>>>
>>>>>>>>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>>>>>>>
>>>>>>>>> -------
>>>>>>>>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>>>>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>>>>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>>>>>>>>>>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>>>>>>>>>>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>>>>>>>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>>>>>>>>> this well either.
>>>>>>>>>>>>
>>>>>>>>>>>>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>>>>>>>>> As you point out, stateful+splittable may not be a particularly useful
>>>>>>>>>>>> combination, but as another example, we have
>>>>>>>>>>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>>>>>>>>>> it requires finalization, stable inputs, and now time sorting. I don't
>>>>>>>>>>>> think we should have a new URN for each combination.
>>>>>>>>>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>>>>>>>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>>>>>>>>
>>>>>>>>>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>>>>>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>>>>>>>>>>>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>>>>>>>>>>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>>>>>>>>>>>>>> Yes.
>>>>>>>>>>>>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> [snip]
>>>>>>>>>>>>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>>>>>>>>> What we need is the ability for (1) runners to reject future pipelines
>>>>>>>>>>>> they cannot faithfully execute and (2) runners to be able to take
>>>>>>>>>>>> advantage of advanced features/protocols when interacting with those
>>>>>>>>>>>> SDKs that understand them while avoiding them for older (or newer)
>>>>>>>>>>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>>>>>>>>>> capabilities.
>>>>>>>>>>>>
>>>>>>>>>>>> Where possible, I think this is best expressed inherently in the set
>>>>>>>>>>>> of transform (and possibly other component) URNs. For example, when an
>>>>>>>>>>>> SDK uses a combine_per_key composite, that's a signal that it
>>>>>>>>>>>> understands the various related combine_* transforms. Similarly, a
>>>>>>>>>>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>>>>>>>>>> recognizing/supporting this primitive. However, this is not always
>>>>>>>>>>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>>>>>>>>> ParDo and for (2) we have features like large iterable and progress
>>>>>>>>>>>> support.
>>>>>>>>>>>>
>>>>>>>>>>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>>>>>>>>>> into the future as we want to remain backwards compatible. This is why
>>>>>>>>>>>> I suggested putting something on the pipeline itself, but we could
>>>>>>>>>>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>>>>>>>>> think that'd be useful now. (Note that a future pipeline-level
>>>>>>>>>>>> requirement could be "inspect (previously non-existent) requirements
>>>>>>>>>>>> field attached to objects of type X.")
>>>>>>>>>>>>
>>>>>>>>>>>> For (2) I think adding a capabilities field to the environment for now
>>>>>>>>>>>> makes the most sense, and as it's optional to inspect them adding it
>>>>>>>>>>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>>>>>>>>>> now is that there are some capabilities that we'd like to enumerate
>>>>>>>>>>>> now rather than make part of the minimal set of things an SDK must
>>>>>>>>>>>> support.)
>>>>>>>>>>>>
>>>>>>>>>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>>>>>>>>
>>>>>>>>>>>>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>>>>>>>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>>>>>>>>>> it makes possible the bug of populating parameters without adding the
>>>>>>>>>>>> required notification to the list.)
>>>>>>>>>>>>
>>>>>>>>>>>>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>>>>>>>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
Oh, sorry. Try it again
https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit?usp=sharing

On Fri, Feb 14, 2020 at 2:04 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Robert,
>
> the doc seems to be locked.
>
> Jan
>
> On 2/14/20 10:56 PM, Robert Bradshaw wrote:
> > I created https://github.com/apache/beam/pull/10873 to add the minimal
> > set of fields to the existing protos, and also created
> > https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
> > to start enumerating some of the URNs we may want to have. It would be
> > a good milestone to get this in by the release next week.
> >
> > On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <kc...@google.com> wrote:
> >>> we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities
> >> +1, I think the potential to formally integrate our idea of compatibility and unit testing is a big advantage of this proposal. Also, when deciding where to draw lines between different URNs, it may help to look at the existing validates runner test categories, which are currently the most accurate signal we have regarding a runner's capabilities.
> >>
> >> On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:
> >>> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an Urn for requiring something or it's an annotation for saying the DoFn provides something (eg. Provides K-anonymization with k defined)
> >>>
> >>> The general theme of this thread seems to be trying to ensure a runner can reject a pipeline if it's not able to provide the right guarantees, so that latter case isn't handled.
> >>>
> >>> Eg. The latter provisions could be used to analyze a pipeline to ensure the outputs are all properly anonymized to a certain degree at construction time.
> >>>
> >>> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:
> >>>>
> >>>>
> >>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>> Hi,
> >>>>>
> >>>>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
> >>>>>
> >>>>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
> >>>>>
> >>>>>   a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
> >>>>>
> >>>>>   b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
> >>>> I think even if these runners keep their execution not using portability, they should migrate to use the portable pipeline definition. Then they can share the same model w/ runners that execute using portability. The Fn API is not required to be used as long as the runner implements the semantics of the pipeline.
> >>>>
> >>>> Kenn
> >>>>
> >>>>>   c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
> >>>>>
> >>>>> Jan
> >>>>>
> >>>>> On 2/13/20 8:42 PM, Robert Burke wrote:
> >>>>>
> >>>>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
> >>>>>
> >>>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
> >>>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
> >>>>>>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
> >>>>>>>
> >>>>>>>
> >>>>>>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
> >>>>>>>
> >>>>>>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
> >>>>>>>
> >>>>>>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
> >>>>>> Yes, this is a very good idea. I've also been thinking of certain sets
> >>>>>> of common operations/well known DoFns that often occur on opposite
> >>>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
> >>>>>> are commonly supported that could be grouped under these meta-urns.
> >>>>>>
> >>>>>> Note that these need not be monotonic, for example a current v1 might
> >>>>>> be requiring LengthPrefixCoderV1, but if a more efficient
> >>>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
> >>>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
> >>>>>>
> >>>>>> Probably makes sense to defer adding such super-urns until we notice a
> >>>>>> set that is commonly used together in practice.
> >>>>>>
> >>>>>> Of course there's still value in SDKs being able to support features
> >>>>>> piecemeal as well, which is the big reason we're avoiding a simple
> >>>>>> monotonically-increasing version number.
> >>>>>>
> >>>>>>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
> >>>>>>>
> >>>>>>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
> >>>>>>>
> >>>>>>> -------
> >>>>>>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
> >>>>>>>>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
> >>>>>>>>>>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
> >>>>>>>>>>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
> >>>>>>>>>>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
> >>>>>>>>>> Yeah, I don't think proto reflection is a flexible enough tool to do
> >>>>>>>>>> this well either.
> >>>>>>>>>>
> >>>>>>>>>>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
> >>>>>>>>>> As you point out, stateful+splittable may not be a particularly useful
> >>>>>>>>>> combination, but as another example, we have
> >>>>>>>>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
> >>>>>>>>>> it requires finalization, stable inputs, and now time sorting. I don't
> >>>>>>>>>> think we should have a new URN for each combination.
> >>>>>>>>>
> >>>>>>>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
> >>>>>>>>
> >>>>>>>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
> >>>>>>>>
> >>>>>>>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
> >>>>>>>>>
> >>>>>>>>> Kenn
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
> >>>>>>>>>>>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
> >>>>>>>>>>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
> >>>>>>>>>>>
> >>>>>>>>>>>>>>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
> >>>>>>>>>>>>>> Yes.
> >>>>>>>>>>>
> >>>>>>>>>>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
> >>>>>>>>>>>
> >>>>>>>>>>>> [snip]
> >>>>>>>>>>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
> >>>>>>>>>> What we need is the ability for (1) runners to reject future pipelines
> >>>>>>>>>> they cannot faithfully execute and (2) runners to be able to take
> >>>>>>>>>> advantage of advanced features/protocols when interacting with those
> >>>>>>>>>> SDKs that understand them while avoiding them for older (or newer)
> >>>>>>>>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
> >>>>>>>>>> capabilities.
> >>>>>>>>>>
> >>>>>>>>>> Where possible, I think this is best expressed inherently in the set
> >>>>>>>>>> of transform (and possibly other component) URNs. For example, when an
> >>>>>>>>>> SDK uses a combine_per_key composite, that's a signal that it
> >>>>>>>>>> understands the various related combine_* transforms. Similarly, a
> >>>>>>>>>> pipeline with a test_stream URN would be rejected by pipelines not
> >>>>>>>>>> recognizing/supporting this primitive. However, this is not always
> >>>>>>>>>> possible, e.g. for (1) we have the aforementioned boolean flags on
> >>>>>>>>>> ParDo and for (2) we have features like large iterable and progress
> >>>>>>>>>> support.
> >>>>>>>>>>
> >>>>>>>>>> For (1) we have to enumerate now everywhere a runner must look a far
> >>>>>>>>>> into the future as we want to remain backwards compatible. This is why
> >>>>>>>>>> I suggested putting something on the pipeline itself, but we could
> >>>>>>>>>> (likely in addition) add it to Transform and/or ParDoPayload if we
> >>>>>>>>>> think that'd be useful now. (Note that a future pipeline-level
> >>>>>>>>>> requirement could be "inspect (previously non-existent) requirements
> >>>>>>>>>> field attached to objects of type X.")
> >>>>>>>>>>
> >>>>>>>>>> For (2) I think adding a capabilities field to the environment for now
> >>>>>>>>>> makes the most sense, and as it's optional to inspect them adding it
> >>>>>>>>>> elsewhere if needed is backwards compatible. (The motivation to do it
> >>>>>>>>>> now is that there are some capabilities that we'd like to enumerate
> >>>>>>>>>> now rather than make part of the minimal set of things an SDK must
> >>>>>>>>>> support.)
> >>>>>>>>>>
> >>>>>>>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
> >>>>>>>>
> >>>>>>>>>>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
> >>>>>>>>>> Good point. That lets us keep it as a list for now. (The risk is that
> >>>>>>>>>> it makes possible the bug of populating parameters without adding the
> >>>>>>>>>> required notification to the list.)
> >>>>>>>>>>
> >>>>>>>>>>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
> >>>>>>>>>>>
> >>>>>>>>>>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
> >>>>>>>>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Robert,

the doc seems to be locked.

Jan

On 2/14/20 10:56 PM, Robert Bradshaw wrote:
> I created https://github.com/apache/beam/pull/10873 to add the minimal
> set of fields to the existing protos, and also created
> https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
> to start enumerating some of the URNs we may want to have. It would be
> a good milestone to get this in by the release next week.
>
> On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <kc...@google.com> wrote:
>>> we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities
>> +1, I think the potential to formally integrate our idea of compatibility and unit testing is a big advantage of this proposal. Also, when deciding where to draw lines between different URNs, it may help to look at the existing validates runner test categories, which are currently the most accurate signal we have regarding a runner's capabilities.
>>
>> On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:
>>> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an Urn for requiring something or it's an annotation for saying the DoFn provides something (eg. Provides K-anonymization with k defined)
>>>
>>> The general theme of this thread seems to be trying to ensure a runner can reject a pipeline if it's not able to provide the right guarantees, so that latter case isn't handled.
>>>
>>> Eg. The latter provisions could be used to analyze a pipeline to ensure the outputs are all properly anonymized to a certain degree at construction time.
>>>
>>> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>
>>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>> Hi,
>>>>>
>>>>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>>>>
>>>>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>>>>
>>>>>   a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>>>>
>>>>>   b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>>> I think even if these runners keep their execution not using portability, they should migrate to use the portable pipeline definition. Then they can share the same model w/ runners that execute using portability. The Fn API is not required to be used as long as the runner implements the semantics of the pipeline.
>>>>
>>>> Kenn
>>>>
>>>>>   c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>>>>
>>>>> Jan
>>>>>
>>>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>>>
>>>>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>>>>
>>>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>>>>>>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>>>>>
>>>>>>>
>>>>>>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>>>>>
>>>>>>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>>>>>
>>>>>>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>>>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>>>>> of common operations/well known DoFns that often occur on opposite
>>>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>>>> are commonly supported that could be grouped under these meta-urns.
>>>>>>
>>>>>> Note that these need not be monotonic, for example a current v1 might
>>>>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>>>
>>>>>> Probably makes sense to defer adding such super-urns until we notice a
>>>>>> set that is commonly used together in practice.
>>>>>>
>>>>>> Of course there's still value in SDKs being able to support features
>>>>>> piecemeal as well, which is the big reason we're avoiding a simple
>>>>>> monotonically-increasing version number.
>>>>>>
>>>>>>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>>>>>
>>>>>>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>>>>>
>>>>>>> -------
>>>>>>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>>>>>>>>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>>>>>>>>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>>>>>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>>>>>>> this well either.
>>>>>>>>>>
>>>>>>>>>>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>>>>>>> As you point out, stateful+splittable may not be a particularly useful
>>>>>>>>>> combination, but as another example, we have
>>>>>>>>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>>>>>>>> it requires finalization, stable inputs, and now time sorting. I don't
>>>>>>>>>> think we should have a new URN for each combination.
>>>>>>>>>
>>>>>>>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>>>>>>
>>>>>>>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>>>>>>
>>>>>>>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>>>>>>>>>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>>>>>>>>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>>>>>>>>>
>>>>>>>>>>>>>>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>>>>>>>>>>>> Yes.
>>>>>>>>>>>
>>>>>>>>>>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>>>>>>>>>
>>>>>>>>>>>> [snip]
>>>>>>>>>>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>>>>>>> What we need is the ability for (1) runners to reject future pipelines
>>>>>>>>>> they cannot faithfully execute and (2) runners to be able to take
>>>>>>>>>> advantage of advanced features/protocols when interacting with those
>>>>>>>>>> SDKs that understand them while avoiding them for older (or newer)
>>>>>>>>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>>>>>>>> capabilities.
>>>>>>>>>>
>>>>>>>>>> Where possible, I think this is best expressed inherently in the set
>>>>>>>>>> of transform (and possibly other component) URNs. For example, when an
>>>>>>>>>> SDK uses a combine_per_key composite, that's a signal that it
>>>>>>>>>> understands the various related combine_* transforms. Similarly, a
>>>>>>>>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>>>>>>>> recognizing/supporting this primitive. However, this is not always
>>>>>>>>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>>>>>>> ParDo and for (2) we have features like large iterable and progress
>>>>>>>>>> support.
>>>>>>>>>>
>>>>>>>>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>>>>>>>> into the future as we want to remain backwards compatible. This is why
>>>>>>>>>> I suggested putting something on the pipeline itself, but we could
>>>>>>>>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>>>>>>> think that'd be useful now. (Note that a future pipeline-level
>>>>>>>>>> requirement could be "inspect (previously non-existent) requirements
>>>>>>>>>> field attached to objects of type X.")
>>>>>>>>>>
>>>>>>>>>> For (2) I think adding a capabilities field to the environment for now
>>>>>>>>>> makes the most sense, and as it's optional to inspect them adding it
>>>>>>>>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>>>>>>>> now is that there are some capabilities that we'd like to enumerate
>>>>>>>>>> now rather than make part of the minimal set of things an SDK must
>>>>>>>>>> support.)
>>>>>>>>>>
>>>>>>>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>>>>>>
>>>>>>>>>>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>>>>>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>>>>>>>> it makes possible the bug of populating parameters without adding the
>>>>>>>>>> required notification to the list.)
>>>>>>>>>>
>>>>>>>>>>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>>>>>>>>>
>>>>>>>>>>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>>>>>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Jan Lukavský <je...@seznam.cz>.
This is not exactly a problem. Having the runner to explicitly declare 
its capabilities is of course a possibility. But do we want to modify 
each runner, in case all the functionality is actually provided by a 
common library? This is about usability. I can imagine (and easier if we 
can provide these requirements/capabilities) a runner that it completely 
decoupled from development of core. Then, runner maintainers might not 
even follow closely the development of core. They might not know about a 
new actual requirement of a pipeline, and it would seem weird to force 
adding new capability without actually changing a line of code.

I'm not saying we have to implement this "automatic capabilities 
propagation" in the first iteration. It would be just nice not to close 
doors somewhere. Adding capabilities actually implemented common library 
in an automated way would be more convenient. Although it might add 
unnecessary complexity, so this has to be carefully designed and 
discussed if this feature would be worth it. From the top of my head, it 
might be possible to export an enum representing the common capabilities 
and let runner declare a switch statement, returning boolean marking the 
feature supported/unsopported. That way, compiler would at least notify 
runner maintainers that there was something added to the library, that 
they might want to export. But there will be better solutions, for sure.

  Jan

On 2/21/20 7:34 AM, Kenneth Knowles wrote:
> Good question. My last sentence was not clear. We do not need to 
> automatically propagate the capabilities offered by runners-core to a 
> particular runner. The runner can (and should) own the claim of what 
> its capabilities are.
>
> Kenn
>
> On Thu, Feb 20, 2020 at 10:05 PM Luke Cwik <lcwik@google.com 
> <ma...@google.com>> wrote:
>
>     Which part of the proposal do you think is solving a problem we
>     may not have?
>
>     On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <kenn@apache.org
>     <ma...@apache.org>> wrote:
>
>         I would rather say that "runners-core" is a utility library
>         with some helpful things. Like other libraries. The runner
>         still decides how to use the library. That was the idea,
>         anyhow. A runner could have a bunch of "if" statements around
>         how it uses some generic runners-core utility, etc. I think at
>         this point the proposal is trying to solve a problem we may
>         not have.
>
>         Kenn
>
>         On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je.ik@seznam.cz
>         <ma...@seznam.cz>> wrote:
>
>
>             On 2/20/20 8:24 PM, Robert Bradshaw wrote:
>>             On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský<je...@seznam.cz>  <ma...@seznam.cz>  wrote:
>>>             Hi,
>>>
>>>             +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>>
>>>             I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>>
>>>               a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>             It sounds like what you're getting at here is a Statful ParDo that
>>             requires "mostly" time sorted input (to keep the amount of state held
>>             bounded) which is somewhat provided (with no bounds given) for
>>             unbounded PCollections but not at all (in general) for batch. Rather
>>             than phrase this as a conditional requirement, I would make a new
>>             requirement "requires mostly time sorted input" (precise definition
>>             TBD, it's hard to specify or guarantee upper bounds) which a runner
>>             could then implement via exact time sorted input in batch and but more
>>             cheaply as a no-op in streaming.
>             +1, that makes sense. My example was a little incomplete,
>             in the sense that, for @RequiresTimeSortedInput does not
>             have any requirements on runner in streaming case, with
>             one exception - the runner must be compiled with the
>             newest runners-core. That brings us to the fact, that
>             runners capabilities are actually not just function of the
>             runner's code, but also code that is imported from
>             runners-core. There probably should be a way for the core
>             to export its capabilities (e.g. provides:
>             beam:requirement:pardo:time_sorted_input:streaming:v1),
>             which should then be united with capabilities of the
>             runner itself. That way a runner which uses runners-core
>             (and StatefulDoFnRunner, that is a complication, not sure
>             how to deal with that), could be made able to satify
>             'beam:requirement:pardo:time_sorted_input:streaming:v1'
>             simply by recompiling the runner with newest core.
>>>               b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>             Yes. As mentioned, we can still convert to portability to do such
>>             analysis even if we don't use it for execution.
>>
>>>               c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>             +1
>>
>>>             Jan
>>>
>>>             On 2/13/20 8:42 PM, Robert Burke wrote:
>>>
>>>             +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>>
>>>             On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw<ro...@google.com>  <ma...@google.com>  wrote:
>>>>             On Thu, Feb 13, 2020 at 10:12 AM Robert Burke<ro...@frantil.com>  <ma...@frantil.com>  wrote:
>>>>>             One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>>>
>>>>>
>>>>>             The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>>>
>>>>>             We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>>>
>>>>>             This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>>>             Yes, this is a very good idea. I've also been thinking of certain sets
>>>>             of common operations/well known DoFns that often occur on opposite
>>>>             sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>>             are commonly supported that could be grouped under these meta-urns.
>>>>
>>>>             Note that these need not be monotonic, for example a current v1 might
>>>>             be requiring LengthPrefixCoderV1, but if a more efficient
>>>>             LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>>             *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>
>>>>             Probably makes sense to defer adding such super-urns until we notice a
>>>>             set that is commonly used together in practice.
>>>>
>>>>             Of course there's still value in SDKs being able to support features
>>>>             piecemeal as well, which is the big reason we're avoiding a simple
>>>>             monotonically-increasing version number.
>>>>
>>>>>             Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>>>
>>>>>             This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>>>
>>>>>             -------
>>>>>             Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>>
>>>>>
>>>>>             On Thu, Feb 13, 2020, 9:06 AM Luke Cwik<lc...@google.com>  <ma...@google.com>  wrote:
>>>>>>             On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles<ke...@apache.org>  <ma...@apache.org>  wrote:
>>>>>>>             On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw<ro...@google.com>  <ma...@google.com>  wrote:
>>>>>>>>             On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik<lc...@google.com>  <ma...@google.com>  wrote:
>>>>>>>>>             We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>>>>>>>             1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>>>>>>>             2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>>>>>             Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>>>>>             this well either.
>>>>>>>>
>>>>>>>>>             If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>>>>>             As you point out, stateful+splittable may not be a particularly useful
>>>>>>>>             combination, but as another example, we have
>>>>>>>>             (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>>>>>>             it requires finalization, stable inputs, and now time sorting. I don't
>>>>>>>>             think we should have a new URN for each combination.
>>>>>>>             Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>>>>             Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>>>>
>>>>>>>             It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>>>>>
>>>>>>>             Kenn
>>>>>>>
>>>>>>>>>>>             I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>>>>>>>>             Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>>>>>>>             I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>>>>>>>
>>>>>>>>>>>>>             On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>>>>>>>>>>             Yes.
>>>>>>>>>             I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>>>>>>>
>>>>>>>>>>             [snip]
>>>>>>>>>             As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>>>>>             What we need is the ability for (1) runners to reject future pipelines
>>>>>>>>             they cannot faithfully execute and (2) runners to be able to take
>>>>>>>>             advantage of advanced features/protocols when interacting with those
>>>>>>>>             SDKs that understand them while avoiding them for older (or newer)
>>>>>>>>             SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>>>>>>             capabilities.
>>>>>>>>
>>>>>>>>             Where possible, I think this is best expressed inherently in the set
>>>>>>>>             of transform (and possibly other component) URNs. For example, when an
>>>>>>>>             SDK uses a combine_per_key composite, that's a signal that it
>>>>>>>>             understands the various related combine_* transforms. Similarly, a
>>>>>>>>             pipeline with a test_stream URN would be rejected by pipelines not
>>>>>>>>             recognizing/supporting this primitive. However, this is not always
>>>>>>>>             possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>>>>>             ParDo and for (2) we have features like large iterable and progress
>>>>>>>>             support.
>>>>>>>>
>>>>>>>>             For (1) we have to enumerate now everywhere a runner must look a far
>>>>>>>>             into the future as we want to remain backwards compatible. This is why
>>>>>>>>             I suggested putting something on the pipeline itself, but we could
>>>>>>>>             (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>>>>>             think that'd be useful now. (Note that a future pipeline-level
>>>>>>>>             requirement could be "inspect (previously non-existent) requirements
>>>>>>>>             field attached to objects of type X.")
>>>>>>>>
>>>>>>>>             For (2) I think adding a capabilities field to the environment for now
>>>>>>>>             makes the most sense, and as it's optional to inspect them adding it
>>>>>>>>             elsewhere if needed is backwards compatible. (The motivation to do it
>>>>>>>>             now is that there are some capabilities that we'd like to enumerate
>>>>>>>>             now rather than make part of the minimal set of things an SDK must
>>>>>>>>             support.)
>>>>>>>>
>>>>>>             Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>>>>
>>>>>>>>>             All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>>>>>             Good point. That lets us keep it as a list for now. (The risk is that
>>>>>>>>             it makes possible the bug of populating parameters without adding the
>>>>>>>>             required notification to the list.)
>>>>>>>>
>>>>>>>>>             I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>>>>>>>
>>>>>>>>>             Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>>>>>             Agreed.
>

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
Good question. My last sentence was not clear. We do not need to
automatically propagate the capabilities offered by runners-core to a
particular runner. The runner can (and should) own the claim of what its
capabilities are.

Kenn

On Thu, Feb 20, 2020 at 10:05 PM Luke Cwik <lc...@google.com> wrote:

> Which part of the proposal do you think is solving a problem we may not
> have?
>
> On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I would rather say that "runners-core" is a utility library with some
>> helpful things. Like other libraries. The runner still decides how to use
>> the library. That was the idea, anyhow. A runner could have a bunch of "if"
>> statements around how it uses some generic runners-core utility, etc. I
>> think at this point the proposal is trying to solve a problem we may not
>> have.
>>
>> Kenn
>>
>> On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>>
>>> On 2/20/20 8:24 PM, Robert Bradshaw wrote:
>>>
>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> <je...@seznam.cz> wrote:
>>>
>>> Hi,
>>>
>>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>>
>>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>>
>>>  a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>>
>>> It sounds like what you're getting at here is a Statful ParDo that
>>> requires "mostly" time sorted input (to keep the amount of state held
>>> bounded) which is somewhat provided (with no bounds given) for
>>> unbounded PCollections but not at all (in general) for batch. Rather
>>> than phrase this as a conditional requirement, I would make a new
>>> requirement "requires mostly time sorted input" (precise definition
>>> TBD, it's hard to specify or guarantee upper bounds) which a runner
>>> could then implement via exact time sorted input in batch and but more
>>> cheaply as a no-op in streaming.
>>>
>>> +1, that makes sense. My example was a little incomplete, in the sense
>>> that, for @RequiresTimeSortedInput does not have any requirements on runner
>>> in streaming case, with one exception - the runner must be compiled with
>>> the newest runners-core. That brings us to the fact, that runners
>>> capabilities are actually not just function of the runner's code, but also
>>> code that is imported from runners-core. There probably should be a way for
>>> the core to export its capabilities (e.g. provides:
>>> beam:requirement:pardo:time_sorted_input:streaming:v1), which should
>>> then be united with capabilities of the runner itself. That way a runner
>>> which uses runners-core (and StatefulDoFnRunner, that is a complication,
>>> not sure how to deal with that), could be made able to satify 'beam:requirement:pardo:time_sorted_input:streaming:v1'
>>> simply by recompiling the runner with newest core.
>>>
>>>  b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>>
>>> Yes. As mentioned, we can still convert to portability to do such
>>> analysis even if we don't use it for execution.
>>>
>>>
>>>  c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>>
>>> +1
>>>
>>>
>>> Jan
>>>
>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>
>>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>>
>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>>>
>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> <ro...@frantil.com> wrote:
>>>
>>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>
>>>
>>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>
>>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>
>>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>>
>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>> of common operations/well known DoFns that often occur on opposite
>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>> are commonly supported that could be grouped under these meta-urns.
>>>
>>> Note that these need not be monotonic, for example a current v1 might
>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>
>>> Probably makes sense to defer adding such super-urns until we notice a
>>> set that is commonly used together in practice.
>>>
>>> Of course there's still value in SDKs being able to support features
>>> piecemeal as well, which is the big reason we're avoiding a simple
>>> monotonically-increasing version number.
>>>
>>>
>>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>
>>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>
>>> -------
>>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>
>>>
>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> <ke...@apache.org> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>>>
>>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>
>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>> this well either.
>>>
>>>
>>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>
>>> As you point out, stateful+splittable may not be a particularly useful
>>> combination, but as another example, we have
>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>> it requires finalization, stable inputs, and now time sorting. I don't
>>> think we should have a new URN for each combination.
>>>
>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>
>>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>
>>>
>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>
>>> Kenn
>>>
>>>
>>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>
>>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>
>>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>
>>>
>>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>
>>> Yes.
>>>
>>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>
>>>
>>> [snip]
>>>
>>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>
>>> What we need is the ability for (1) runners to reject future pipelines
>>> they cannot faithfully execute and (2) runners to be able to take
>>> advantage of advanced features/protocols when interacting with those
>>> SDKs that understand them while avoiding them for older (or newer)
>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>> capabilities.
>>>
>>> Where possible, I think this is best expressed inherently in the set
>>> of transform (and possibly other component) URNs. For example, when an
>>> SDK uses a combine_per_key composite, that's a signal that it
>>> understands the various related combine_* transforms. Similarly, a
>>> pipeline with a test_stream URN would be rejected by pipelines not
>>> recognizing/supporting this primitive. However, this is not always
>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>> ParDo and for (2) we have features like large iterable and progress
>>> support.
>>>
>>> For (1) we have to enumerate now everywhere a runner must look a far
>>> into the future as we want to remain backwards compatible. This is why
>>> I suggested putting something on the pipeline itself, but we could
>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>> think that'd be useful now. (Note that a future pipeline-level
>>> requirement could be "inspect (previously non-existent) requirements
>>> field attached to objects of type X.")
>>>
>>> For (2) I think adding a capabilities field to the environment for now
>>> makes the most sense, and as it's optional to inspect them adding it
>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>> now is that there are some capabilities that we'd like to enumerate
>>> now rather than make part of the minimal set of things an SDK must
>>> support.)
>>>
>>>
>>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>
>>>
>>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>
>>> Good point. That lets us keep it as a list for now. (The risk is that
>>> it makes possible the bug of populating parameters without adding the
>>> required notification to the list.)
>>>
>>>
>>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>
>>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>
>>> Agreed.
>>>
>>>

Re: FnAPI proto backwards compatibility

Posted by Luke Cwik <lc...@google.com>.
Which part of the proposal do you think is solving a problem we may not
have?

On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <ke...@apache.org> wrote:

> I would rather say that "runners-core" is a utility library with some
> helpful things. Like other libraries. The runner still decides how to use
> the library. That was the idea, anyhow. A runner could have a bunch of "if"
> statements around how it uses some generic runners-core utility, etc. I
> think at this point the proposal is trying to solve a problem we may not
> have.
>
> Kenn
>
> On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>>
>> On 2/20/20 8:24 PM, Robert Bradshaw wrote:
>>
>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> <je...@seznam.cz> wrote:
>>
>> Hi,
>>
>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>
>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>
>>  a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>
>> It sounds like what you're getting at here is a Statful ParDo that
>> requires "mostly" time sorted input (to keep the amount of state held
>> bounded) which is somewhat provided (with no bounds given) for
>> unbounded PCollections but not at all (in general) for batch. Rather
>> than phrase this as a conditional requirement, I would make a new
>> requirement "requires mostly time sorted input" (precise definition
>> TBD, it's hard to specify or guarantee upper bounds) which a runner
>> could then implement via exact time sorted input in batch and but more
>> cheaply as a no-op in streaming.
>>
>> +1, that makes sense. My example was a little incomplete, in the sense
>> that, for @RequiresTimeSortedInput does not have any requirements on runner
>> in streaming case, with one exception - the runner must be compiled with
>> the newest runners-core. That brings us to the fact, that runners
>> capabilities are actually not just function of the runner's code, but also
>> code that is imported from runners-core. There probably should be a way for
>> the core to export its capabilities (e.g. provides:
>> beam:requirement:pardo:time_sorted_input:streaming:v1), which should
>> then be united with capabilities of the runner itself. That way a runner
>> which uses runners-core (and StatefulDoFnRunner, that is a complication,
>> not sure how to deal with that), could be made able to satify 'beam:requirement:pardo:time_sorted_input:streaming:v1'
>> simply by recompiling the runner with newest core.
>>
>>  b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>
>> Yes. As mentioned, we can still convert to portability to do such
>> analysis even if we don't use it for execution.
>>
>>
>>  c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>
>> +1
>>
>>
>> Jan
>>
>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>
>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>
>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>>
>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> <ro...@frantil.com> wrote:
>>
>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>
>>
>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>
>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>
>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>
>> Yes, this is a very good idea. I've also been thinking of certain sets
>> of common operations/well known DoFns that often occur on opposite
>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>> are commonly supported that could be grouped under these meta-urns.
>>
>> Note that these need not be monotonic, for example a current v1 might
>> be requiring LengthPrefixCoderV1, but if a more efficient
>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>
>> Probably makes sense to defer adding such super-urns until we notice a
>> set that is commonly used together in practice.
>>
>> Of course there's still value in SDKs being able to support features
>> piecemeal as well, which is the big reason we're avoiding a simple
>> monotonically-increasing version number.
>>
>>
>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>
>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>
>> -------
>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>
>>
>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>>
>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> <ke...@apache.org> wrote:
>>
>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>>
>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>>
>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>
>> Yeah, I don't think proto reflection is a flexible enough tool to do
>> this well either.
>>
>>
>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>
>> As you point out, stateful+splittable may not be a particularly useful
>> combination, but as another example, we have
>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>> it requires finalization, stable inputs, and now time sorting. I don't
>> think we should have a new URN for each combination.
>>
>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>
>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>
>>
>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>
>> Kenn
>>
>>
>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>
>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>
>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>
>>
>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>
>> Yes.
>>
>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>
>>
>> [snip]
>>
>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>
>> What we need is the ability for (1) runners to reject future pipelines
>> they cannot faithfully execute and (2) runners to be able to take
>> advantage of advanced features/protocols when interacting with those
>> SDKs that understand them while avoiding them for older (or newer)
>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>> capabilities.
>>
>> Where possible, I think this is best expressed inherently in the set
>> of transform (and possibly other component) URNs. For example, when an
>> SDK uses a combine_per_key composite, that's a signal that it
>> understands the various related combine_* transforms. Similarly, a
>> pipeline with a test_stream URN would be rejected by pipelines not
>> recognizing/supporting this primitive. However, this is not always
>> possible, e.g. for (1) we have the aforementioned boolean flags on
>> ParDo and for (2) we have features like large iterable and progress
>> support.
>>
>> For (1) we have to enumerate now everywhere a runner must look a far
>> into the future as we want to remain backwards compatible. This is why
>> I suggested putting something on the pipeline itself, but we could
>> (likely in addition) add it to Transform and/or ParDoPayload if we
>> think that'd be useful now. (Note that a future pipeline-level
>> requirement could be "inspect (previously non-existent) requirements
>> field attached to objects of type X.")
>>
>> For (2) I think adding a capabilities field to the environment for now
>> makes the most sense, and as it's optional to inspect them adding it
>> elsewhere if needed is backwards compatible. (The motivation to do it
>> now is that there are some capabilities that we'd like to enumerate
>> now rather than make part of the minimal set of things an SDK must
>> support.)
>>
>>
>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>
>>
>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>
>> Good point. That lets us keep it as a list for now. (The risk is that
>> it makes possible the bug of populating parameters without adding the
>> required notification to the list.)
>>
>>
>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>
>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>
>> Agreed.
>>
>>

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
I would rather say that "runners-core" is a utility library with some
helpful things. Like other libraries. The runner still decides how to use
the library. That was the idea, anyhow. A runner could have a bunch of "if"
statements around how it uses some generic runners-core utility, etc. I
think at this point the proposal is trying to solve a problem we may not
have.

Kenn

On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 2/20/20 8:24 PM, Robert Bradshaw wrote:
>
> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> <je...@seznam.cz> wrote:
>
> Hi,
>
> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>
> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>
>  a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>
> It sounds like what you're getting at here is a Statful ParDo that
> requires "mostly" time sorted input (to keep the amount of state held
> bounded) which is somewhat provided (with no bounds given) for
> unbounded PCollections but not at all (in general) for batch. Rather
> than phrase this as a conditional requirement, I would make a new
> requirement "requires mostly time sorted input" (precise definition
> TBD, it's hard to specify or guarantee upper bounds) which a runner
> could then implement via exact time sorted input in batch and but more
> cheaply as a no-op in streaming.
>
> +1, that makes sense. My example was a little incomplete, in the sense
> that, for @RequiresTimeSortedInput does not have any requirements on runner
> in streaming case, with one exception - the runner must be compiled with
> the newest runners-core. That brings us to the fact, that runners
> capabilities are actually not just function of the runner's code, but also
> code that is imported from runners-core. There probably should be a way for
> the core to export its capabilities (e.g. provides:
> beam:requirement:pardo:time_sorted_input:streaming:v1), which should then
> be united with capabilities of the runner itself. That way a runner which
> uses runners-core (and StatefulDoFnRunner, that is a complication, not sure
> how to deal with that), could be made able to satify 'beam:requirement:pardo:time_sorted_input:streaming:v1'
> simply by recompiling the runner with newest core.
>
>  b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>
> Yes. As mentioned, we can still convert to portability to do such
> analysis even if we don't use it for execution.
>
>
>  c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>
> +1
>
>
> Jan
>
> On 2/13/20 8:42 PM, Robert Burke wrote:
>
> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>
> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>
> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> <ro...@frantil.com> wrote:
>
> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>
>
> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>
> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>
> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>
> Yes, this is a very good idea. I've also been thinking of certain sets
> of common operations/well known DoFns that often occur on opposite
> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
> are commonly supported that could be grouped under these meta-urns.
>
> Note that these need not be monotonic, for example a current v1 might
> be requiring LengthPrefixCoderV1, but if a more efficient
> LengthPrefixCoderV2 comes along eventually v2 could require that and
> *not* require the old, now rarely used LengthPrefixCoderV1.
>
> Probably makes sense to defer adding such super-urns until we notice a
> set that is commonly used together in practice.
>
> Of course there's still value in SDKs being able to support features
> piecemeal as well, which is the big reason we're avoiding a simple
> monotonically-increasing version number.
>
>
> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>
> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>
> -------
> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>
>
> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>
> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> <ke...@apache.org> wrote:
>
> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> <ro...@google.com> wrote:
>
> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> <lc...@google.com> wrote:
>
> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>
> Yeah, I don't think proto reflection is a flexible enough tool to do
> this well either.
>
>
> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>
> As you point out, stateful+splittable may not be a particularly useful
> combination, but as another example, we have
> (backwards-incompatible-when-introduced) markers on DoFn as to whether
> it requires finalization, stable inputs, and now time sorting. I don't
> think we should have a new URN for each combination.
>
> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>
> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>
>
> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>
> Kenn
>
>
> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>
> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>
> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>
>
> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>
> Yes.
>
> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>
>
> [snip]
>
> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>
> What we need is the ability for (1) runners to reject future pipelines
> they cannot faithfully execute and (2) runners to be able to take
> advantage of advanced features/protocols when interacting with those
> SDKs that understand them while avoiding them for older (or newer)
> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
> capabilities.
>
> Where possible, I think this is best expressed inherently in the set
> of transform (and possibly other component) URNs. For example, when an
> SDK uses a combine_per_key composite, that's a signal that it
> understands the various related combine_* transforms. Similarly, a
> pipeline with a test_stream URN would be rejected by pipelines not
> recognizing/supporting this primitive. However, this is not always
> possible, e.g. for (1) we have the aforementioned boolean flags on
> ParDo and for (2) we have features like large iterable and progress
> support.
>
> For (1) we have to enumerate now everywhere a runner must look a far
> into the future as we want to remain backwards compatible. This is why
> I suggested putting something on the pipeline itself, but we could
> (likely in addition) add it to Transform and/or ParDoPayload if we
> think that'd be useful now. (Note that a future pipeline-level
> requirement could be "inspect (previously non-existent) requirements
> field attached to objects of type X.")
>
> For (2) I think adding a capabilities field to the environment for now
> makes the most sense, and as it's optional to inspect them adding it
> elsewhere if needed is backwards compatible. (The motivation to do it
> now is that there are some capabilities that we'd like to enumerate
> now rather than make part of the minimal set of things an SDK must
> support.)
>
>
> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>
>
> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>
> Good point. That lets us keep it as a list for now. (The risk is that
> it makes possible the bug of populating parameters without adding the
> required notification to the list.)
>
>
> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>
> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>
> Agreed.
>
>

Re: FnAPI proto backwards compatibility

Posted by Jan Lukavský <je...@seznam.cz>.
On 2/20/20 8:24 PM, Robert Bradshaw wrote:
> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>> Hi,
>>
>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>
>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>
>>   a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
> It sounds like what you're getting at here is a Statful ParDo that
> requires "mostly" time sorted input (to keep the amount of state held
> bounded) which is somewhat provided (with no bounds given) for
> unbounded PCollections but not at all (in general) for batch. Rather
> than phrase this as a conditional requirement, I would make a new
> requirement "requires mostly time sorted input" (precise definition
> TBD, it's hard to specify or guarantee upper bounds) which a runner
> could then implement via exact time sorted input in batch and but more
> cheaply as a no-op in streaming.
+1, that makes sense. My example was a little incomplete, in the sense 
that, for @RequiresTimeSortedInput does not have any requirements on 
runner in streaming case, with one exception - the runner must be 
compiled with the newest runners-core. That brings us to the fact, that 
runners capabilities are actually not just function of the runner's 
code, but also code that is imported from runners-core. There probably 
should be a way for the core to export its capabilities (e.g. provides: 
beam:requirement:pardo:time_sorted_input:streaming:v1), which should 
then be united with capabilities of the runner itself. That way a runner 
which uses runners-core (and StatefulDoFnRunner, that is a complication, 
not sure how to deal with that), could be made able to satify 
'beam:requirement:pardo:time_sorted_input:streaming:v1' simply by 
recompiling the runner with newest core.
>>   b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
> Yes. As mentioned, we can still convert to portability to do such
> analysis even if we don't use it for execution.
>
>>   c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
> +1
>
>> Jan
>>
>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>
>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>
>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>>>> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>>
>>>>
>>>> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>>
>>>> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>>
>>>> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>> of common operations/well known DoFns that often occur on opposite
>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>> are commonly supported that could be grouped under these meta-urns.
>>>
>>> Note that these need not be monotonic, for example a current v1 might
>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>
>>> Probably makes sense to defer adding such super-urns until we notice a
>>> set that is commonly used together in practice.
>>>
>>> Of course there's still value in SDKs being able to support features
>>> piecemeal as well, which is the big reason we're avoiding a simple
>>> monotonically-increasing version number.
>>>
>>>> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>>
>>>> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>>
>>>> -------
>>>> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>
>>>>
>>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>
>>>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>>>>>> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>>>>>> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>>>> this well either.
>>>>>>>
>>>>>>>> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>>>> As you point out, stateful+splittable may not be a particularly useful
>>>>>>> combination, but as another example, we have
>>>>>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>>>>> it requires finalization, stable inputs, and now time sorting. I don't
>>>>>>> think we should have a new URN for each combination.
>>>>>>
>>>>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>>>
>>>>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>>>
>>>>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>>
>>>>>>>>>> I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>>>>>>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>>>>>> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>>>>>>
>>>>>>>>>>>> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>>>>>>>>> Yes.
>>>>>>>>
>>>>>>>> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>>>>>>
>>>>>>>>> [snip]
>>>>>>>> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>>>> What we need is the ability for (1) runners to reject future pipelines
>>>>>>> they cannot faithfully execute and (2) runners to be able to take
>>>>>>> advantage of advanced features/protocols when interacting with those
>>>>>>> SDKs that understand them while avoiding them for older (or newer)
>>>>>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>>>>> capabilities.
>>>>>>>
>>>>>>> Where possible, I think this is best expressed inherently in the set
>>>>>>> of transform (and possibly other component) URNs. For example, when an
>>>>>>> SDK uses a combine_per_key composite, that's a signal that it
>>>>>>> understands the various related combine_* transforms. Similarly, a
>>>>>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>>>>> recognizing/supporting this primitive. However, this is not always
>>>>>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>>>> ParDo and for (2) we have features like large iterable and progress
>>>>>>> support.
>>>>>>>
>>>>>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>>>>> into the future as we want to remain backwards compatible. This is why
>>>>>>> I suggested putting something on the pipeline itself, but we could
>>>>>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>>>> think that'd be useful now. (Note that a future pipeline-level
>>>>>>> requirement could be "inspect (previously non-existent) requirements
>>>>>>> field attached to objects of type X.")
>>>>>>>
>>>>>>> For (2) I think adding a capabilities field to the environment for now
>>>>>>> makes the most sense, and as it's optional to inspect them adding it
>>>>>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>>>>> now is that there are some capabilities that we'd like to enumerate
>>>>>>> now rather than make part of the minimal set of things an SDK must
>>>>>>> support.)
>>>>>>>
>>>>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>>>
>>>>>>>> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>>>>> it makes possible the bug of populating parameters without adding the
>>>>>>> required notification to the list.)
>>>>>>>
>>>>>>>> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>>>>>>
>>>>>>>> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi,
>
> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>
> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>
>  a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.

It sounds like what you're getting at here is a Statful ParDo that
requires "mostly" time sorted input (to keep the amount of state held
bounded) which is somewhat provided (with no bounds given) for
unbounded PCollections but not at all (in general) for batch. Rather
than phrase this as a conditional requirement, I would make a new
requirement "requires mostly time sorted input" (precise definition
TBD, it's hard to specify or guarantee upper bounds) which a runner
could then implement via exact time sorted input in batch and but more
cheaply as a no-op in streaming.

>  b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.

Yes. As mentioned, we can still convert to portability to do such
analysis even if we don't use it for execution.

>  c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)

+1

> Jan
>
> On 2/13/20 8:42 PM, Robert Burke wrote:
>
> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>
> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>> >
>> > One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>> >
>> >
>> > The super urns would need to be static after definition to avoid mismatched definitions down the road.
>> >
>> > We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>> >
>> > This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>
>> Yes, this is a very good idea. I've also been thinking of certain sets
>> of common operations/well known DoFns that often occur on opposite
>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>> are commonly supported that could be grouped under these meta-urns.
>>
>> Note that these need not be monotonic, for example a current v1 might
>> be requiring LengthPrefixCoderV1, but if a more efficient
>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>
>> Probably makes sense to defer adding such super-urns until we notice a
>> set that is commonly used together in practice.
>>
>> Of course there's still value in SDKs being able to support features
>> piecemeal as well, which is the big reason we're avoiding a simple
>> monotonically-increasing version number.
>>
>> > Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>> >
>> > This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>> >
>> > -------
>> > Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>> >
>> >
>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>> >>
>> >>
>> >>
>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>> >>>>
>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>> >>>> >
>> >>>> > We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>> >>>> > 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>> >>>> > 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>> >>>>
>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>> >>>> this well either.
>> >>>>
>> >>>> > If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>> >>>>
>> >>>> As you point out, stateful+splittable may not be a particularly useful
>> >>>> combination, but as another example, we have
>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>> >>>> it requires finalization, stable inputs, and now time sorting. I don't
>> >>>> think we should have a new URN for each combination.
>> >>>
>> >>>
>> >>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>> >>
>> >>
>> >> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>> >>
>> >>>
>> >>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>> >>>
>> >>> Kenn
>> >>>
>> >>>>
>> >>>>
>> >>>> >> > I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>> >>>> >>
>> >>>> >> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>> >>>> >
>> >>>> > I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>> >>>> >
>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>> >>>> >> >>
>> >>>> >> >> Yes.
>> >>>> >
>> >>>> >
>> >>>> > I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>> >>>> >
>> >>>> >> [snip]
>> >>>> >
>> >>>> > As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>> >>>>
>> >>>> What we need is the ability for (1) runners to reject future pipelines
>> >>>> they cannot faithfully execute and (2) runners to be able to take
>> >>>> advantage of advanced features/protocols when interacting with those
>> >>>> SDKs that understand them while avoiding them for older (or newer)
>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>> >>>> capabilities.
>> >>>>
>> >>>> Where possible, I think this is best expressed inherently in the set
>> >>>> of transform (and possibly other component) URNs. For example, when an
>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>> >>>> understands the various related combine_* transforms. Similarly, a
>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>> >>>> recognizing/supporting this primitive. However, this is not always
>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>> >>>> ParDo and for (2) we have features like large iterable and progress
>> >>>> support.
>> >>>>
>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>> >>>> into the future as we want to remain backwards compatible. This is why
>> >>>> I suggested putting something on the pipeline itself, but we could
>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>> >>>> think that'd be useful now. (Note that a future pipeline-level
>> >>>> requirement could be "inspect (previously non-existent) requirements
>> >>>> field attached to objects of type X.")
>> >>>>
>> >>>> For (2) I think adding a capabilities field to the environment for now
>> >>>> makes the most sense, and as it's optional to inspect them adding it
>> >>>> elsewhere if needed is backwards compatible. (The motivation to do it
>> >>>> now is that there are some capabilities that we'd like to enumerate
>> >>>> now rather than make part of the minimal set of things an SDK must
>> >>>> support.)
>> >>>>
>> >>
>> >> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>> >>
>> >>>>
>> >>>> > All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>> >>>>
>> >>>> Good point. That lets us keep it as a list for now. (The risk is that
>> >>>> it makes possible the bug of populating parameters without adding the
>> >>>> required notification to the list.)
>> >>>>
>> >>>> > I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>> >>>> >
>> >>>> > Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>> >>>>
>> >>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
I created https://github.com/apache/beam/pull/10873 to add the minimal
set of fields to the existing protos, and also created
https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
to start enumerating some of the URNs we may want to have. It would be
a good milestone to get this in by the release next week.

On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <kc...@google.com> wrote:
>
> > we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities
>
> +1, I think the potential to formally integrate our idea of compatibility and unit testing is a big advantage of this proposal. Also, when deciding where to draw lines between different URNs, it may help to look at the existing validates runner test categories, which are currently the most accurate signal we have regarding a runner's capabilities.
>
> On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:
>>
>> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an Urn for requiring something or it's an annotation for saying the DoFn provides something (eg. Provides K-anonymization with k defined)
>>
>> The general theme of this thread seems to be trying to ensure a runner can reject a pipeline if it's not able to provide the right guarantees, so that latter case isn't handled.
>>
>> Eg. The latter provisions could be used to analyze a pipeline to ensure the outputs are all properly anonymized to a certain degree at construction time.
>>
>> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>
>>>
>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>> Hi,
>>>>
>>>> +1 for adding pipeline required features. I think being able to reject pipeline with unknown requirement is pretty much needed, mostly because that enables runners to completely decouple from SDKs, while being able to recognize when a pipeline constructed with incomplatible version of SDK is run.
>>>>
>>>> I'll add some observations I made when implementing the latest "requires time sorted input" addition with regards to this discussion:
>>>>
>>>>  a) the features of pipeline are not simple function of set of PTransforms being present in the pipeline, but also depend on (type of) inputs. For instance a PTransform might have a simple expansion to primitive PTransforms in streaming case, but don't have such expansion in batch case. That is to say, runner that doesn't actually know of a specific extension to some PTransform _might_ actually execute it correctly under some conditions. But _must_ fail in other cases.
>>>>
>>>>  b) it would be good if this feature would work independently of portability (for Java SDK). We still have (at least two) non-portable runners that are IMO widely used in production and are likely to last for some time.
>>>
>>> I think even if these runners keep their execution not using portability, they should migrate to use the portable pipeline definition. Then they can share the same model w/ runners that execute using portability. The Fn API is not required to be used as long as the runner implements the semantics of the pipeline.
>>>
>>> Kenn
>>>
>>>>
>>>>  c) we can take advantage of these pipeline features to get rid of the categories of @ValidatesRunner tests, because we could have just simply @ValidatesRunner and each test would be matched against runner capabilities (i.e. a runner would be tested with given test if and only if it would not reject it)
>>>>
>>>> Jan
>>>>
>>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>>
>>>> +1 to deferring for now. Since they should not be modified after adoption, it makes sense not to get ahead of ourselves.
>>>>
>>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>>>>> >
>>>>> > One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>>>>> >
>>>>> >
>>>>> > The super urns would need to be static after definition to avoid mismatched definitions down the road.
>>>>> >
>>>>> > We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>>>>> >
>>>>> > This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.
>>>>>
>>>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>>>> of common operations/well known DoFns that often occur on opposite
>>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>>> are commonly supported that could be grouped under these meta-urns.
>>>>>
>>>>> Note that these need not be monotonic, for example a current v1 might
>>>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>>
>>>>> Probably makes sense to defer adding such super-urns until we notice a
>>>>> set that is commonly used together in practice.
>>>>>
>>>>> Of course there's still value in SDKs being able to support features
>>>>> piecemeal as well, which is the big reason we're avoiding a simple
>>>>> monotonically-increasing version number.
>>>>>
>>>>> > Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>>>>> >
>>>>> > This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>>>>> >
>>>>> > -------
>>>>> > Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>> >
>>>>> >
>>>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>> >>>>
>>>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>>>> >>>> >
>>>>> >>>> > We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>>> >>>> > 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>>> >>>> > 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>> >>>>
>>>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>> >>>> this well either.
>>>>> >>>>
>>>>> >>>> > If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>> >>>>
>>>>> >>>> As you point out, stateful+splittable may not be a particularly useful
>>>>> >>>> combination, but as another example, we have
>>>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>>> >>>> it requires finalization, stable inputs, and now time sorting. I don't
>>>>> >>>> think we should have a new URN for each combination.
>>>>> >>>
>>>>> >>>
>>>>> >>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>>>> >>
>>>>> >>
>>>>> >> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>>>> >>
>>>>> >>>
>>>>> >>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>>> >>>
>>>>> >>> Kenn
>>>>> >>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>>> >>>> >>
>>>>> >>>> >> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>>> >>>> >
>>>>> >>>> > I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>>> >>>> >
>>>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>>> >>>> >> >>
>>>>> >>>> >> >> Yes.
>>>>> >>>> >
>>>>> >>>> >
>>>>> >>>> > I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>>> >>>> >
>>>>> >>>> >> [snip]
>>>>> >>>> >
>>>>> >>>> > As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>> >>>>
>>>>> >>>> What we need is the ability for (1) runners to reject future pipelines
>>>>> >>>> they cannot faithfully execute and (2) runners to be able to take
>>>>> >>>> advantage of advanced features/protocols when interacting with those
>>>>> >>>> SDKs that understand them while avoiding them for older (or newer)
>>>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>>> >>>> capabilities.
>>>>> >>>>
>>>>> >>>> Where possible, I think this is best expressed inherently in the set
>>>>> >>>> of transform (and possibly other component) URNs. For example, when an
>>>>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>>>>> >>>> understands the various related combine_* transforms. Similarly, a
>>>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>>> >>>> recognizing/supporting this primitive. However, this is not always
>>>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>> >>>> ParDo and for (2) we have features like large iterable and progress
>>>>> >>>> support.
>>>>> >>>>
>>>>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>>> >>>> into the future as we want to remain backwards compatible. This is why
>>>>> >>>> I suggested putting something on the pipeline itself, but we could
>>>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>> >>>> think that'd be useful now. (Note that a future pipeline-level
>>>>> >>>> requirement could be "inspect (previously non-existent) requirements
>>>>> >>>> field attached to objects of type X.")
>>>>> >>>>
>>>>> >>>> For (2) I think adding a capabilities field to the environment for now
>>>>> >>>> makes the most sense, and as it's optional to inspect them adding it
>>>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>>> >>>> now is that there are some capabilities that we'd like to enumerate
>>>>> >>>> now rather than make part of the minimal set of things an SDK must
>>>>> >>>> support.)
>>>>> >>>>
>>>>> >>
>>>>> >> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>>>> >>
>>>>> >>>>
>>>>> >>>> > All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>> >>>>
>>>>> >>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>>> >>>> it makes possible the bug of populating parameters without adding the
>>>>> >>>> required notification to the list.)
>>>>> >>>>
>>>>> >>>> > I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>>> >>>> >
>>>>> >>>> > Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>> >>>>
>>>>> >>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Kyle Weaver <kc...@google.com>.
> we can take advantage of these pipeline features to get rid of the
categories of @ValidatesRunner tests, because we could have just simply
@ValidatesRunner and each test would be matched against runner capabilities

+1, I think the potential to formally integrate our idea of compatibility
and unit testing is a big advantage of this proposal. Also, when deciding
where to draw lines between different URNs, it may help to look at the
existing validates runner test categories, which are currently the most
accurate signal we have regarding a runner's capabilities.

On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:

> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has
> an Urn for requiring something or it's an annotation for saying the DoFn
> provides something (eg. Provides K-anonymization with k defined)
>
> The general theme of this thread seems to be trying to ensure a runner can
> reject a pipeline if it's not able to provide the right guarantees, so that
> latter case isn't handled.
>
> Eg. The latter provisions could be used to analyze a pipeline to ensure
> the outputs are all properly anonymized to a certain degree at construction
> time.
>
> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> +1 for adding pipeline required features. I think being able to reject
>>> pipeline with unknown requirement is pretty much needed, mostly because
>>> that enables runners to completely decouple from SDKs, while being able to
>>> recognize when a pipeline constructed with incomplatible version of SDK is
>>> run.
>>>
>>> I'll add some observations I made when implementing the latest "requires
>>> time sorted input" addition with regards to this discussion:
>>>
>>>  a) the features of pipeline are not simple function of set of
>>> PTransforms being present in the pipeline, but also depend on (type of)
>>> inputs. For instance a PTransform might have a simple expansion to
>>> primitive PTransforms in streaming case, but don't have such expansion in
>>> batch case. That is to say, runner that doesn't actually know of a specific
>>> extension to some PTransform _might_ actually execute it correctly under
>>> some conditions. But _must_ fail in other cases.
>>>
>>>  b) it would be good if this feature would work independently of
>>> portability (for Java SDK). We still have (at least two) non-portable
>>> runners that are IMO widely used in production and are likely to last for
>>> some time.
>>>
>> I think even if these runners keep their execution not using portability,
>> they should migrate to use the portable pipeline definition. Then they can
>> share the same model w/ runners that execute using portability. The Fn API
>> is not required to be used as long as the runner implements the semantics
>> of the pipeline.
>>
>> Kenn
>>
>>
>>>  c) we can take advantage of these pipeline features to get rid of the
>>> categories of @ValidatesRunner tests, because we could have just simply
>>> @ValidatesRunner and each test would be matched against runner capabilities
>>> (i.e. a runner would be tested with given test if and only if it would not
>>> reject it)
>>>
>>> Jan
>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>
>>> +1 to deferring for now. Since they should not be modified after
>>> adoption, it makes sense not to get ahead of ourselves.
>>>
>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com>
>>>> wrote:
>>>> >
>>>> > One thing that doesn't appear to have been suggested yet is we could
>>>> "batch" urns together under a "super urn" so that adding one super urn is
>>>> like adding each of the represented batch of features. This prevents
>>>> needing to send dozens of urns to be individually sent over.
>>>> >
>>>> >
>>>> > The super urns would need to be static after definition to avoid
>>>> mismatched definitions down the road.
>>>> >
>>>> > We collect together urns what is reasonably consider "vX" support,
>>>> and can then increment that later.
>>>> >
>>>> > This would simplify new SDKs, as they can have a goal of initial v1
>>>> support as we define what level of feature support it has, and doesn't
>>>> prevent new capabilities from being added incrementally.
>>>>
>>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>>> of common operations/well known DoFns that often occur on opposite
>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>> are commonly supported that could be grouped under these meta-urns.
>>>>
>>>> Note that these need not be monotonic, for example a current v1 might
>>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>
>>>> Probably makes sense to defer adding such super-urns until we notice a
>>>> set that is commonly used together in practice.
>>>>
>>>> Of course there's still value in SDKs being able to support features
>>>> piecemeal as well, which is the big reason we're avoiding a simple
>>>> monotonically-increasing version number.
>>>>
>>>> > Similarly, certain features sets could stand alone, eg around SQL.
>>>> It's benefitial for optimization reasons if an SDK has native projection
>>>> and UDF support for example, which a runner could take advantage of by
>>>> avoiding extra cross language hops. These could then also be grouped under
>>>> a SQL super urn.
>>>> >
>>>> > This is from the SDK capability side of course, rather than the SDK
>>>> pipeline requirements side.
>>>> >
>>>> > -------
>>>> > Related to that last point, it might be good to nail down early the
>>>> perspective used when discussing these things, as there's a dual between
>>>> "what and SDK can do", and "what the runner will do to a pipeline that the
>>>> SDK can understand" (eg. Combiner lifting, and state backed iterables), as
>>>> well as "what the pipeline requires from the runner" and "what the runner
>>>> is able to do" (eg. Requires sorted input)
>>>> >
>>>> >
>>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <
>>>> robertwb@google.com> wrote:
>>>> >>>>
>>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com>
>>>> wrote:
>>>> >>>> >
>>>> >>>> > We can always detect on the runner/SDK side whether there is an
>>>> unknown field[1] within a payload and fail to process it but this is
>>>> painful in two situations:
>>>> >>>> > 1) It doesn't provide for a good error message since you can't
>>>> say what the purpose of the field is. With a capability URN, the runner/SDK
>>>> could say which URN it doesn't understand.
>>>> >>>> > 2) It doesn't allow for the addition of fields which don't
>>>> impact semantics of execution. For example, if the display data feature was
>>>> being developed, a runner could ignore it and still execute the pipeline
>>>> correctly.
>>>> >>>>
>>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to
>>>> do
>>>> >>>> this well either.
>>>> >>>>
>>>> >>>> > If we think this to be common enough, we can add capabilities
>>>> list to the PTransform so each PTransform can do this and has a natural way
>>>> of being extended for additions which are forwards compatible. The
>>>> alternative to having capabilities on PTransform (and other constructs) is
>>>> that we would have a new URN when the specification of the transform
>>>> changes. For forwards compatible changes, each SDK/runner would map older
>>>> versions of the URN onto the latest and internally treat it as the latest
>>>> version but always downgrade it to the version the other party expects when
>>>> communicating with it. Backwards incompatible changes would always require
>>>> a new URN which capabilities at the PTransform level would not help with.
>>>> >>>>
>>>> >>>> As you point out, stateful+splittable may not be a particularly
>>>> useful
>>>> >>>> combination, but as another example, we have
>>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to
>>>> whether
>>>> >>>> it requires finalization, stable inputs, and now time sorting. I
>>>> don't
>>>> >>>> think we should have a new URN for each combination.
>>>> >>>
>>>> >>>
>>>> >>> Agree with this. I don't think stateful, splittable, and "plain"
>>>> ParDo are comparable to these. Each is an entirely different computational
>>>> paradigm: per-element independent processing, per-key-and-window linear
>>>> processing, and per-element-and-restriction splittable processing. Most
>>>> relevant IMO is the nature of the parallelism. If you added state to
>>>> splittable processing, it would still be splittable processing. Just as
>>>> Combine and ParDo can share the SideInput specification, it is easy to
>>>> share relevant sub-structures like state declarations. But it is a fair
>>>> point that the ability to split can be ignored and run as a plain-old
>>>> ParDo. It brings up the question of whether a runner that doesn't know SDF
>>>> is should have to reject it or should be allowed to run poorly.
>>>> >>
>>>> >>
>>>> >> Being splittable means that the SDK could choose to return a
>>>> continuation saying please process the rest of my element in X amount of
>>>> time which would require the runner to inspect certain fields on responses.
>>>> One example would be I don't have many more messages to read from this
>>>> message stream at the moment and another example could be that I detected
>>>> that this filesystem is throttling me or is down and I would like to resume
>>>> processing later.
>>>> >>
>>>> >>>
>>>> >>> It isn't a huge deal. Three different top-level URNS versus three
>>>> different sub-URNs will achieve the same result in the end if we get this
>>>> "capability" thing in place.
>>>> >>>
>>>> >>> Kenn
>>>> >>>
>>>> >>>>
>>>> >>>>
>>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should
>>>> have separate PTransform URNs since they are different paradigms than
>>>> "vanilla" ParDo.
>>>> >>>> >>
>>>> >>>> >> Here I disagree. What about one that is both splittable and
>>>> stateful? Would one have a fourth URN for that? If/when another flavor of
>>>> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in
>>>> particular can be executed as a normal ParDo as long as the output is
>>>> bounded.)
>>>> >>>> >
>>>> >>>> > I agree that you could have stateful and splittable dofns where
>>>> the element is the key and you share state and timers across restrictions.
>>>> No runner is capable of executing this efficiently.
>>>> >>>> >
>>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns
>>>> the Environment proto completely, so it is in a position to ensure the
>>>> involved docker images support the necessary features.
>>>> >>>> >> >>
>>>> >>>> >> >> Yes.
>>>> >>>> >
>>>> >>>> >
>>>> >>>> > I believe capabilities do exist on a Pipeline and it informs
>>>> runners about new types of fields to be aware of either within Components
>>>> or on the Pipeline object itself but for this discussion it makes sense
>>>> that an environment would store most "capabilities" related to execution.
>>>> >>>> >
>>>> >>>> >> [snip]
>>>> >>>> >
>>>> >>>> > As for the proto clean-ups, the scope is to cover almost all
>>>> things needed for execution now and to follow-up with optional transforms,
>>>> payloads, and coders later which would exclude job managment APIs and
>>>> artifact staging. A formal enumeration would be useful here. Also, we
>>>> should provide formal guidance about adding new fields, adding new types of
>>>> transforms, new types of proto messages, ... (best to describe this on a
>>>> case by case basis as to how people are trying to modify the protos and
>>>> evolve this guidance over time).
>>>> >>>>
>>>> >>>> What we need is the ability for (1) runners to reject future
>>>> pipelines
>>>> >>>> they cannot faithfully execute and (2) runners to be able to take
>>>> >>>> advantage of advanced features/protocols when interacting with
>>>> those
>>>> >>>> SDKs that understand them while avoiding them for older (or newer)
>>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2)
>>>> (optional)
>>>> >>>> capabilities.
>>>> >>>>
>>>> >>>> Where possible, I think this is best expressed inherently in the
>>>> set
>>>> >>>> of transform (and possibly other component) URNs. For example,
>>>> when an
>>>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>>>> >>>> understands the various related combine_* transforms. Similarly, a
>>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>> >>>> recognizing/supporting this primitive. However, this is not always
>>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>> >>>> ParDo and for (2) we have features like large iterable and progress
>>>> >>>> support.
>>>> >>>>
>>>> >>>> For (1) we have to enumerate now everywhere a runner must look a
>>>> far
>>>> >>>> into the future as we want to remain backwards compatible. This is
>>>> why
>>>> >>>> I suggested putting something on the pipeline itself, but we could
>>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>> >>>> think that'd be useful now. (Note that a future pipeline-level
>>>> >>>> requirement could be "inspect (previously non-existent)
>>>> requirements
>>>> >>>> field attached to objects of type X.")
>>>> >>>>
>>>> >>>> For (2) I think adding a capabilities field to the environment for
>>>> now
>>>> >>>> makes the most sense, and as it's optional to inspect them adding
>>>> it
>>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do
>>>> it
>>>> >>>> now is that there are some capabilities that we'd like to enumerate
>>>> >>>> now rather than make part of the minimal set of things an SDK must
>>>> >>>> support.)
>>>> >>>>
>>>> >>
>>>> >> Agree on the separation of requirements from capabilities where
>>>> requirements is a set of MUST understand while capabilities are a set of
>>>> MAY understand.
>>>> >>
>>>> >>>>
>>>> >>>> > All in all, I think "capabilities" is about informing a runner
>>>> about what they should know about and what they are allowed to do. If we go
>>>> with a list of "capabilities", we could always add a "parameterized
>>>> capabilities" urn which would tell runners they need to also look at some
>>>> other field.
>>>> >>>>
>>>> >>>> Good point. That lets us keep it as a list for now. (The risk is
>>>> that
>>>> >>>> it makes possible the bug of populating parameters without adding
>>>> the
>>>> >>>> required notification to the list.)
>>>> >>>>
>>>> >>>> > I also believe capabilities should NOT be "inherited". For
>>>> example if we define capabilities on a ParDoPayload, and on a PTransform
>>>> and on Environment, then ParDoPayload capabilities shouldn't be copied to
>>>> PTransform and PTransform specific capabilities shouldn't be copied to the
>>>> Environment. My reasoning about this is that some "capabilities" can only
>>>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>>>> apply generally everywhere. The best example I could think of is that
>>>> Environment A supports progress reporting while Environment B doesn't so it
>>>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>> >>>> >
>>>> >>>> > Are capabilities strictly different from "resources" (transform
>>>> needs python package X) or "execution hints" (e.g. deploy on machines that
>>>> have GPUs, some generic but mostly runner specific hints)? At first glance
>>>> I would say yes.
>>>> >>>>
>>>> >>>> Agreed.
>>>>
>>>

Re: FnAPI proto backwards compatibility

Posted by Robert Burke <ro...@frantil.com>.
Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has
an Urn for requiring something or it's an annotation for saying the DoFn
provides something (eg. Provides K-anonymization with k defined)

The general theme of this thread seems to be trying to ensure a runner can
reject a pipeline if it's not able to provide the right guarantees, so that
latter case isn't handled.

Eg. The latter provisions could be used to analyze a pipeline to ensure the
outputs are all properly anonymized to a certain degree at construction
time.

On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> +1 for adding pipeline required features. I think being able to reject
>> pipeline with unknown requirement is pretty much needed, mostly because
>> that enables runners to completely decouple from SDKs, while being able to
>> recognize when a pipeline constructed with incomplatible version of SDK is
>> run.
>>
>> I'll add some observations I made when implementing the latest "requires
>> time sorted input" addition with regards to this discussion:
>>
>>  a) the features of pipeline are not simple function of set of
>> PTransforms being present in the pipeline, but also depend on (type of)
>> inputs. For instance a PTransform might have a simple expansion to
>> primitive PTransforms in streaming case, but don't have such expansion in
>> batch case. That is to say, runner that doesn't actually know of a specific
>> extension to some PTransform _might_ actually execute it correctly under
>> some conditions. But _must_ fail in other cases.
>>
>>  b) it would be good if this feature would work independently of
>> portability (for Java SDK). We still have (at least two) non-portable
>> runners that are IMO widely used in production and are likely to last for
>> some time.
>>
> I think even if these runners keep their execution not using portability,
> they should migrate to use the portable pipeline definition. Then they can
> share the same model w/ runners that execute using portability. The Fn API
> is not required to be used as long as the runner implements the semantics
> of the pipeline.
>
> Kenn
>
>
>>  c) we can take advantage of these pipeline features to get rid of the
>> categories of @ValidatesRunner tests, because we could have just simply
>> @ValidatesRunner and each test would be matched against runner capabilities
>> (i.e. a runner would be tested with given test if and only if it would not
>> reject it)
>>
>> Jan
>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>
>> +1 to deferring for now. Since they should not be modified after
>> adoption, it makes sense not to get ahead of ourselves.
>>
>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com>
>>> wrote:
>>> >
>>> > One thing that doesn't appear to have been suggested yet is we could
>>> "batch" urns together under a "super urn" so that adding one super urn is
>>> like adding each of the represented batch of features. This prevents
>>> needing to send dozens of urns to be individually sent over.
>>> >
>>> >
>>> > The super urns would need to be static after definition to avoid
>>> mismatched definitions down the road.
>>> >
>>> > We collect together urns what is reasonably consider "vX" support, and
>>> can then increment that later.
>>> >
>>> > This would simplify new SDKs, as they can have a goal of initial v1
>>> support as we define what level of feature support it has, and doesn't
>>> prevent new capabilities from being added incrementally.
>>>
>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>> of common operations/well known DoFns that often occur on opposite
>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>> are commonly supported that could be grouped under these meta-urns.
>>>
>>> Note that these need not be monotonic, for example a current v1 might
>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>
>>> Probably makes sense to defer adding such super-urns until we notice a
>>> set that is commonly used together in practice.
>>>
>>> Of course there's still value in SDKs being able to support features
>>> piecemeal as well, which is the big reason we're avoiding a simple
>>> monotonically-increasing version number.
>>>
>>> > Similarly, certain features sets could stand alone, eg around SQL.
>>> It's benefitial for optimization reasons if an SDK has native projection
>>> and UDF support for example, which a runner could take advantage of by
>>> avoiding extra cross language hops. These could then also be grouped under
>>> a SQL super urn.
>>> >
>>> > This is from the SDK capability side of course, rather than the SDK
>>> pipeline requirements side.
>>> >
>>> > -------
>>> > Related to that last point, it might be good to nail down early the
>>> perspective used when discussing these things, as there's a dual between
>>> "what and SDK can do", and "what the runner will do to a pipeline that the
>>> SDK can understand" (eg. Combiner lifting, and state backed iterables), as
>>> well as "what the pipeline requires from the runner" and "what the runner
>>> is able to do" (eg. Requires sorted input)
>>> >
>>> >
>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >>>>
>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com>
>>> wrote:
>>> >>>> >
>>> >>>> > We can always detect on the runner/SDK side whether there is an
>>> unknown field[1] within a payload and fail to process it but this is
>>> painful in two situations:
>>> >>>> > 1) It doesn't provide for a good error message since you can't
>>> say what the purpose of the field is. With a capability URN, the runner/SDK
>>> could say which URN it doesn't understand.
>>> >>>> > 2) It doesn't allow for the addition of fields which don't impact
>>> semantics of execution. For example, if the display data feature was being
>>> developed, a runner could ignore it and still execute the pipeline
>>> correctly.
>>> >>>>
>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>> >>>> this well either.
>>> >>>>
>>> >>>> > If we think this to be common enough, we can add capabilities
>>> list to the PTransform so each PTransform can do this and has a natural way
>>> of being extended for additions which are forwards compatible. The
>>> alternative to having capabilities on PTransform (and other constructs) is
>>> that we would have a new URN when the specification of the transform
>>> changes. For forwards compatible changes, each SDK/runner would map older
>>> versions of the URN onto the latest and internally treat it as the latest
>>> version but always downgrade it to the version the other party expects when
>>> communicating with it. Backwards incompatible changes would always require
>>> a new URN which capabilities at the PTransform level would not help with.
>>> >>>>
>>> >>>> As you point out, stateful+splittable may not be a particularly
>>> useful
>>> >>>> combination, but as another example, we have
>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to
>>> whether
>>> >>>> it requires finalization, stable inputs, and now time sorting. I
>>> don't
>>> >>>> think we should have a new URN for each combination.
>>> >>>
>>> >>>
>>> >>> Agree with this. I don't think stateful, splittable, and "plain"
>>> ParDo are comparable to these. Each is an entirely different computational
>>> paradigm: per-element independent processing, per-key-and-window linear
>>> processing, and per-element-and-restriction splittable processing. Most
>>> relevant IMO is the nature of the parallelism. If you added state to
>>> splittable processing, it would still be splittable processing. Just as
>>> Combine and ParDo can share the SideInput specification, it is easy to
>>> share relevant sub-structures like state declarations. But it is a fair
>>> point that the ability to split can be ignored and run as a plain-old
>>> ParDo. It brings up the question of whether a runner that doesn't know SDF
>>> is should have to reject it or should be allowed to run poorly.
>>> >>
>>> >>
>>> >> Being splittable means that the SDK could choose to return a
>>> continuation saying please process the rest of my element in X amount of
>>> time which would require the runner to inspect certain fields on responses.
>>> One example would be I don't have many more messages to read from this
>>> message stream at the moment and another example could be that I detected
>>> that this filesystem is throttling me or is down and I would like to resume
>>> processing later.
>>> >>
>>> >>>
>>> >>> It isn't a huge deal. Three different top-level URNS versus three
>>> different sub-URNs will achieve the same result in the end if we get this
>>> "capability" thing in place.
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>>>
>>> >>>>
>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should
>>> have separate PTransform URNs since they are different paradigms than
>>> "vanilla" ParDo.
>>> >>>> >>
>>> >>>> >> Here I disagree. What about one that is both splittable and
>>> stateful? Would one have a fourth URN for that? If/when another flavor of
>>> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in
>>> particular can be executed as a normal ParDo as long as the output is
>>> bounded.)
>>> >>>> >
>>> >>>> > I agree that you could have stateful and splittable dofns where
>>> the element is the key and you share state and timers across restrictions.
>>> No runner is capable of executing this efficiently.
>>> >>>> >
>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the
>>> Environment proto completely, so it is in a position to ensure the involved
>>> docker images support the necessary features.
>>> >>>> >> >>
>>> >>>> >> >> Yes.
>>> >>>> >
>>> >>>> >
>>> >>>> > I believe capabilities do exist on a Pipeline and it informs
>>> runners about new types of fields to be aware of either within Components
>>> or on the Pipeline object itself but for this discussion it makes sense
>>> that an environment would store most "capabilities" related to execution.
>>> >>>> >
>>> >>>> >> [snip]
>>> >>>> >
>>> >>>> > As for the proto clean-ups, the scope is to cover almost all
>>> things needed for execution now and to follow-up with optional transforms,
>>> payloads, and coders later which would exclude job managment APIs and
>>> artifact staging. A formal enumeration would be useful here. Also, we
>>> should provide formal guidance about adding new fields, adding new types of
>>> transforms, new types of proto messages, ... (best to describe this on a
>>> case by case basis as to how people are trying to modify the protos and
>>> evolve this guidance over time).
>>> >>>>
>>> >>>> What we need is the ability for (1) runners to reject future
>>> pipelines
>>> >>>> they cannot faithfully execute and (2) runners to be able to take
>>> >>>> advantage of advanced features/protocols when interacting with those
>>> >>>> SDKs that understand them while avoiding them for older (or newer)
>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2)
>>> (optional)
>>> >>>> capabilities.
>>> >>>>
>>> >>>> Where possible, I think this is best expressed inherently in the set
>>> >>>> of transform (and possibly other component) URNs. For example, when
>>> an
>>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>>> >>>> understands the various related combine_* transforms. Similarly, a
>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>>> >>>> recognizing/supporting this primitive. However, this is not always
>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>> >>>> ParDo and for (2) we have features like large iterable and progress
>>> >>>> support.
>>> >>>>
>>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>>> >>>> into the future as we want to remain backwards compatible. This is
>>> why
>>> >>>> I suggested putting something on the pipeline itself, but we could
>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>> >>>> think that'd be useful now. (Note that a future pipeline-level
>>> >>>> requirement could be "inspect (previously non-existent) requirements
>>> >>>> field attached to objects of type X.")
>>> >>>>
>>> >>>> For (2) I think adding a capabilities field to the environment for
>>> now
>>> >>>> makes the most sense, and as it's optional to inspect them adding it
>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do
>>> it
>>> >>>> now is that there are some capabilities that we'd like to enumerate
>>> >>>> now rather than make part of the minimal set of things an SDK must
>>> >>>> support.)
>>> >>>>
>>> >>
>>> >> Agree on the separation of requirements from capabilities where
>>> requirements is a set of MUST understand while capabilities are a set of
>>> MAY understand.
>>> >>
>>> >>>>
>>> >>>> > All in all, I think "capabilities" is about informing a runner
>>> about what they should know about and what they are allowed to do. If we go
>>> with a list of "capabilities", we could always add a "parameterized
>>> capabilities" urn which would tell runners they need to also look at some
>>> other field.
>>> >>>>
>>> >>>> Good point. That lets us keep it as a list for now. (The risk is
>>> that
>>> >>>> it makes possible the bug of populating parameters without adding
>>> the
>>> >>>> required notification to the list.)
>>> >>>>
>>> >>>> > I also believe capabilities should NOT be "inherited". For
>>> example if we define capabilities on a ParDoPayload, and on a PTransform
>>> and on Environment, then ParDoPayload capabilities shouldn't be copied to
>>> PTransform and PTransform specific capabilities shouldn't be copied to the
>>> Environment. My reasoning about this is that some "capabilities" can only
>>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>>> apply generally everywhere. The best example I could think of is that
>>> Environment A supports progress reporting while Environment B doesn't so it
>>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>> >>>> >
>>> >>>> > Are capabilities strictly different from "resources" (transform
>>> needs python package X) or "execution hints" (e.g. deploy on machines that
>>> have GPUs, some generic but mostly runner specific hints)? At first glance
>>> I would say yes.
>>> >>>>
>>> >>>> Agreed.
>>>
>>

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> +1 for adding pipeline required features. I think being able to reject
> pipeline with unknown requirement is pretty much needed, mostly because
> that enables runners to completely decouple from SDKs, while being able to
> recognize when a pipeline constructed with incomplatible version of SDK is
> run.
>
> I'll add some observations I made when implementing the latest "requires
> time sorted input" addition with regards to this discussion:
>
>  a) the features of pipeline are not simple function of set of PTransforms
> being present in the pipeline, but also depend on (type of) inputs. For
> instance a PTransform might have a simple expansion to primitive
> PTransforms in streaming case, but don't have such expansion in batch case.
> That is to say, runner that doesn't actually know of a specific extension
> to some PTransform _might_ actually execute it correctly under some
> conditions. But _must_ fail in other cases.
>
>  b) it would be good if this feature would work independently of
> portability (for Java SDK). We still have (at least two) non-portable
> runners that are IMO widely used in production and are likely to last for
> some time.
>
I think even if these runners keep their execution not using portability,
they should migrate to use the portable pipeline definition. Then they can
share the same model w/ runners that execute using portability. The Fn API
is not required to be used as long as the runner implements the semantics
of the pipeline.

Kenn


>  c) we can take advantage of these pipeline features to get rid of the
> categories of @ValidatesRunner tests, because we could have just simply
> @ValidatesRunner and each test would be matched against runner capabilities
> (i.e. a runner would be tested with given test if and only if it would not
> reject it)
>
> Jan
> On 2/13/20 8:42 PM, Robert Burke wrote:
>
> +1 to deferring for now. Since they should not be modified after adoption,
> it makes sense not to get ahead of ourselves.
>
> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>> >
>> > One thing that doesn't appear to have been suggested yet is we could
>> "batch" urns together under a "super urn" so that adding one super urn is
>> like adding each of the represented batch of features. This prevents
>> needing to send dozens of urns to be individually sent over.
>> >
>> >
>> > The super urns would need to be static after definition to avoid
>> mismatched definitions down the road.
>> >
>> > We collect together urns what is reasonably consider "vX" support, and
>> can then increment that later.
>> >
>> > This would simplify new SDKs, as they can have a goal of initial v1
>> support as we define what level of feature support it has, and doesn't
>> prevent new capabilities from being added incrementally.
>>
>> Yes, this is a very good idea. I've also been thinking of certain sets
>> of common operations/well known DoFns that often occur on opposite
>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>> are commonly supported that could be grouped under these meta-urns.
>>
>> Note that these need not be monotonic, for example a current v1 might
>> be requiring LengthPrefixCoderV1, but if a more efficient
>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>
>> Probably makes sense to defer adding such super-urns until we notice a
>> set that is commonly used together in practice.
>>
>> Of course there's still value in SDKs being able to support features
>> piecemeal as well, which is the big reason we're avoiding a simple
>> monotonically-increasing version number.
>>
>> > Similarly, certain features sets could stand alone, eg around SQL. It's
>> benefitial for optimization reasons if an SDK has native projection and UDF
>> support for example, which a runner could take advantage of by avoiding
>> extra cross language hops. These could then also be grouped under a SQL
>> super urn.
>> >
>> > This is from the SDK capability side of course, rather than the SDK
>> pipeline requirements side.
>> >
>> > -------
>> > Related to that last point, it might be good to nail down early the
>> perspective used when discussing these things, as there's a dual between
>> "what and SDK can do", and "what the runner will do to a pipeline that the
>> SDK can understand" (eg. Combiner lifting, and state backed iterables), as
>> well as "what the pipeline requires from the runner" and "what the runner
>> is able to do" (eg. Requires sorted input)
>> >
>> >
>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>> >>
>> >>
>> >>
>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>>
>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>> >>>> >
>> >>>> > We can always detect on the runner/SDK side whether there is an
>> unknown field[1] within a payload and fail to process it but this is
>> painful in two situations:
>> >>>> > 1) It doesn't provide for a good error message since you can't say
>> what the purpose of the field is. With a capability URN, the runner/SDK
>> could say which URN it doesn't understand.
>> >>>> > 2) It doesn't allow for the addition of fields which don't impact
>> semantics of execution. For example, if the display data feature was being
>> developed, a runner could ignore it and still execute the pipeline
>> correctly.
>> >>>>
>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>> >>>> this well either.
>> >>>>
>> >>>> > If we think this to be common enough, we can add capabilities list
>> to the PTransform so each PTransform can do this and has a natural way of
>> being extended for additions which are forwards compatible. The alternative
>> to having capabilities on PTransform (and other constructs) is that we
>> would have a new URN when the specification of the transform changes. For
>> forwards compatible changes, each SDK/runner would map older versions of
>> the URN onto the latest and internally treat it as the latest version but
>> always downgrade it to the version the other party expects when
>> communicating with it. Backwards incompatible changes would always require
>> a new URN which capabilities at the PTransform level would not help with.
>> >>>>
>> >>>> As you point out, stateful+splittable may not be a particularly
>> useful
>> >>>> combination, but as another example, we have
>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to
>> whether
>> >>>> it requires finalization, stable inputs, and now time sorting. I
>> don't
>> >>>> think we should have a new URN for each combination.
>> >>>
>> >>>
>> >>> Agree with this. I don't think stateful, splittable, and "plain"
>> ParDo are comparable to these. Each is an entirely different computational
>> paradigm: per-element independent processing, per-key-and-window linear
>> processing, and per-element-and-restriction splittable processing. Most
>> relevant IMO is the nature of the parallelism. If you added state to
>> splittable processing, it would still be splittable processing. Just as
>> Combine and ParDo can share the SideInput specification, it is easy to
>> share relevant sub-structures like state declarations. But it is a fair
>> point that the ability to split can be ignored and run as a plain-old
>> ParDo. It brings up the question of whether a runner that doesn't know SDF
>> is should have to reject it or should be allowed to run poorly.
>> >>
>> >>
>> >> Being splittable means that the SDK could choose to return a
>> continuation saying please process the rest of my element in X amount of
>> time which would require the runner to inspect certain fields on responses.
>> One example would be I don't have many more messages to read from this
>> message stream at the moment and another example could be that I detected
>> that this filesystem is throttling me or is down and I would like to resume
>> processing later.
>> >>
>> >>>
>> >>> It isn't a huge deal. Three different top-level URNS versus three
>> different sub-URNs will achieve the same result in the end if we get this
>> "capability" thing in place.
>> >>>
>> >>> Kenn
>> >>>
>> >>>>
>> >>>>
>> >>>> >> > I do think that splittable ParDo and stateful ParDo should have
>> separate PTransform URNs since they are different paradigms than "vanilla"
>> ParDo.
>> >>>> >>
>> >>>> >> Here I disagree. What about one that is both splittable and
>> stateful? Would one have a fourth URN for that? If/when another flavor of
>> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in
>> particular can be executed as a normal ParDo as long as the output is
>> bounded.)
>> >>>> >
>> >>>> > I agree that you could have stateful and splittable dofns where
>> the element is the key and you share state and timers across restrictions.
>> No runner is capable of executing this efficiently.
>> >>>> >
>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the
>> Environment proto completely, so it is in a position to ensure the involved
>> docker images support the necessary features.
>> >>>> >> >>
>> >>>> >> >> Yes.
>> >>>> >
>> >>>> >
>> >>>> > I believe capabilities do exist on a Pipeline and it informs
>> runners about new types of fields to be aware of either within Components
>> or on the Pipeline object itself but for this discussion it makes sense
>> that an environment would store most "capabilities" related to execution.
>> >>>> >
>> >>>> >> [snip]
>> >>>> >
>> >>>> > As for the proto clean-ups, the scope is to cover almost all
>> things needed for execution now and to follow-up with optional transforms,
>> payloads, and coders later which would exclude job managment APIs and
>> artifact staging. A formal enumeration would be useful here. Also, we
>> should provide formal guidance about adding new fields, adding new types of
>> transforms, new types of proto messages, ... (best to describe this on a
>> case by case basis as to how people are trying to modify the protos and
>> evolve this guidance over time).
>> >>>>
>> >>>> What we need is the ability for (1) runners to reject future
>> pipelines
>> >>>> they cannot faithfully execute and (2) runners to be able to take
>> >>>> advantage of advanced features/protocols when interacting with those
>> >>>> SDKs that understand them while avoiding them for older (or newer)
>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2)
>> (optional)
>> >>>> capabilities.
>> >>>>
>> >>>> Where possible, I think this is best expressed inherently in the set
>> >>>> of transform (and possibly other component) URNs. For example, when
>> an
>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>> >>>> understands the various related combine_* transforms. Similarly, a
>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>> >>>> recognizing/supporting this primitive. However, this is not always
>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>> >>>> ParDo and for (2) we have features like large iterable and progress
>> >>>> support.
>> >>>>
>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>> >>>> into the future as we want to remain backwards compatible. This is
>> why
>> >>>> I suggested putting something on the pipeline itself, but we could
>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>> >>>> think that'd be useful now. (Note that a future pipeline-level
>> >>>> requirement could be "inspect (previously non-existent) requirements
>> >>>> field attached to objects of type X.")
>> >>>>
>> >>>> For (2) I think adding a capabilities field to the environment for
>> now
>> >>>> makes the most sense, and as it's optional to inspect them adding it
>> >>>> elsewhere if needed is backwards compatible. (The motivation to do it
>> >>>> now is that there are some capabilities that we'd like to enumerate
>> >>>> now rather than make part of the minimal set of things an SDK must
>> >>>> support.)
>> >>>>
>> >>
>> >> Agree on the separation of requirements from capabilities where
>> requirements is a set of MUST understand while capabilities are a set of
>> MAY understand.
>> >>
>> >>>>
>> >>>> > All in all, I think "capabilities" is about informing a runner
>> about what they should know about and what they are allowed to do. If we go
>> with a list of "capabilities", we could always add a "parameterized
>> capabilities" urn which would tell runners they need to also look at some
>> other field.
>> >>>>
>> >>>> Good point. That lets us keep it as a list for now. (The risk is that
>> >>>> it makes possible the bug of populating parameters without adding the
>> >>>> required notification to the list.)
>> >>>>
>> >>>> > I also believe capabilities should NOT be "inherited". For example
>> if we define capabilities on a ParDoPayload, and on a PTransform and on
>> Environment, then ParDoPayload capabilities shouldn't be copied to
>> PTransform and PTransform specific capabilities shouldn't be copied to the
>> Environment. My reasoning about this is that some "capabilities" can only
>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>> apply generally everywhere. The best example I could think of is that
>> Environment A supports progress reporting while Environment B doesn't so it
>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>> >>>> >
>> >>>> > Are capabilities strictly different from "resources" (transform
>> needs python package X) or "execution hints" (e.g. deploy on machines that
>> have GPUs, some generic but mostly runner specific hints)? At first glance
>> I would say yes.
>> >>>>
>> >>>> Agreed.
>>
>

Re: FnAPI proto backwards compatibility

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

+1 for adding pipeline required features. I think being able to reject 
pipeline with unknown requirement is pretty much needed, mostly because 
that enables runners to completely decouple from SDKs, while being able 
to recognize when a pipeline constructed with incomplatible version of 
SDK is run.

I'll add some observations I made when implementing the latest "requires 
time sorted input" addition with regards to this discussion:

  a) the features of pipeline are not simple function of set of 
PTransforms being present in the pipeline, but also depend on (type of) 
inputs. For instance a PTransform might have a simple expansion to 
primitive PTransforms in streaming case, but don't have such expansion 
in batch case. That is to say, runner that doesn't actually know of a 
specific extension to some PTransform _might_ actually execute it 
correctly under some conditions. But _must_ fail in other cases.

  b) it would be good if this feature would work independently of 
portability (for Java SDK). We still have (at least two) non-portable 
runners that are IMO widely used in production and are likely to last 
for some time.

  c) we can take advantage of these pipeline features to get rid of the 
categories of @ValidatesRunner tests, because we could have just simply 
@ValidatesRunner and each test would be matched against runner 
capabilities (i.e. a runner would be tested with given test if and only 
if it would not reject it)

Jan

On 2/13/20 8:42 PM, Robert Burke wrote:
> +1 to deferring for now. Since they should not be modified after 
> adoption, it makes sense not to get ahead of ourselves.
>
> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <robertwb@google.com 
> <ma...@google.com>> wrote:
>
>     On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <robert@frantil.com
>     <ma...@frantil.com>> wrote:
>     >
>     > One thing that doesn't appear to have been suggested yet is we
>     could "batch" urns together under a "super urn" so that adding one
>     super urn is like adding each of the represented batch of
>     features. This prevents needing to send dozens of urns to be
>     individually sent over.
>     >
>     >
>     > The super urns would need to be static after definition to avoid
>     mismatched definitions down the road.
>     >
>     > We collect together urns what is reasonably consider "vX"
>     support, and can then increment that later.
>     >
>     > This would simplify new SDKs, as they can have a goal of initial
>     v1 support as we define what level of feature support it has, and
>     doesn't prevent new capabilities from being added incrementally.
>
>     Yes, this is a very good idea. I've also been thinking of certain sets
>     of common operations/well known DoFns that often occur on opposite
>     sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>     are commonly supported that could be grouped under these meta-urns.
>
>     Note that these need not be monotonic, for example a current v1 might
>     be requiring LengthPrefixCoderV1, but if a more efficient
>     LengthPrefixCoderV2 comes along eventually v2 could require that and
>     *not* require the old, now rarely used LengthPrefixCoderV1.
>
>     Probably makes sense to defer adding such super-urns until we notice a
>     set that is commonly used together in practice.
>
>     Of course there's still value in SDKs being able to support features
>     piecemeal as well, which is the big reason we're avoiding a simple
>     monotonically-increasing version number.
>
>     > Similarly, certain features sets could stand alone, eg around
>     SQL. It's benefitial for optimization reasons if an SDK has native
>     projection and UDF support for example, which a runner could take
>     advantage of by avoiding extra cross language hops. These could
>     then also be grouped under a SQL super urn.
>     >
>     > This is from the SDK capability side of course, rather than the
>     SDK pipeline requirements side.
>     >
>     > -------
>     > Related to that last point, it might be good to nail down early
>     the perspective used when discussing these things, as there's a
>     dual between "what and SDK can do", and "what the runner will do
>     to a pipeline that the SDK can understand" (eg. Combiner lifting,
>     and state backed iterables), as well as "what the pipeline
>     requires from the runner" and "what the runner is able to do" (eg.
>     Requires sorted input)
>     >
>     >
>     > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lcwik@google.com
>     <ma...@google.com>> wrote:
>     >>
>     >>
>     >>
>     >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles
>     <kenn@apache.org <ma...@apache.org>> wrote:
>     >>>
>     >>>
>     >>>
>     >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw
>     <robertwb@google.com <ma...@google.com>> wrote:
>     >>>>
>     >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lcwik@google.com
>     <ma...@google.com>> wrote:
>     >>>> >
>     >>>> > We can always detect on the runner/SDK side whether there
>     is an unknown field[1] within a payload and fail to process it but
>     this is painful in two situations:
>     >>>> > 1) It doesn't provide for a good error message since you
>     can't say what the purpose of the field is. With a capability URN,
>     the runner/SDK could say which URN it doesn't understand.
>     >>>> > 2) It doesn't allow for the addition of fields which don't
>     impact semantics of execution. For example, if the display data
>     feature was being developed, a runner could ignore it and still
>     execute the pipeline correctly.
>     >>>>
>     >>>> Yeah, I don't think proto reflection is a flexible enough
>     tool to do
>     >>>> this well either.
>     >>>>
>     >>>> > If we think this to be common enough, we can add
>     capabilities list to the PTransform so each PTransform can do this
>     and has a natural way of being extended for additions which are
>     forwards compatible. The alternative to having capabilities on
>     PTransform (and other constructs) is that we would have a new URN
>     when the specification of the transform changes. For forwards
>     compatible changes, each SDK/runner would map older versions of
>     the URN onto the latest and internally treat it as the latest
>     version but always downgrade it to the version the other party
>     expects when communicating with it. Backwards incompatible changes
>     would always require a new URN which capabilities at the
>     PTransform level would not help with.
>     >>>>
>     >>>> As you point out, stateful+splittable may not be a
>     particularly useful
>     >>>> combination, but as another example, we have
>     >>>> (backwards-incompatible-when-introduced) markers on DoFn as
>     to whether
>     >>>> it requires finalization, stable inputs, and now time
>     sorting. I don't
>     >>>> think we should have a new URN for each combination.
>     >>>
>     >>>
>     >>> Agree with this. I don't think stateful, splittable, and
>     "plain" ParDo are comparable to these. Each is an entirely
>     different computational paradigm: per-element independent
>     processing, per-key-and-window linear processing, and
>     per-element-and-restriction splittable processing. Most relevant
>     IMO is the nature of the parallelism. If you added state to
>     splittable processing, it would still be splittable processing.
>     Just as Combine and ParDo can share the SideInput specification,
>     it is easy to share relevant sub-structures like state
>     declarations. But it is a fair point that the ability to split can
>     be ignored and run as a plain-old ParDo. It brings up the question
>     of whether a runner that doesn't know SDF is should have to reject
>     it or should be allowed to run poorly.
>     >>
>     >>
>     >> Being splittable means that the SDK could choose to return a
>     continuation saying please process the rest of my element in X
>     amount of time which would require the runner to inspect certain
>     fields on responses. One example would be I don't have many more
>     messages to read from this message stream at the moment and
>     another example could be that I detected that this filesystem is
>     throttling me or is down and I would like to resume processing later.
>     >>
>     >>>
>     >>> It isn't a huge deal. Three different top-level URNS versus
>     three different sub-URNs will achieve the same result in the end
>     if we get this "capability" thing in place.
>     >>>
>     >>> Kenn
>     >>>
>     >>>>
>     >>>>
>     >>>> >> > I do think that splittable ParDo and stateful ParDo
>     should have separate PTransform URNs since they are different
>     paradigms than "vanilla" ParDo.
>     >>>> >>
>     >>>> >> Here I disagree. What about one that is both splittable
>     and stateful? Would one have a fourth URN for that? If/when
>     another flavor of DoFn comes out, would we then want 8 distinct
>     URNs? (SplitableParDo in particular can be executed as a normal
>     ParDo as long as the output is bounded.)
>     >>>> >
>     >>>> > I agree that you could have stateful and splittable dofns
>     where the element is the key and you share state and timers across
>     restrictions. No runner is capable of executing this efficiently.
>     >>>> >
>     >>>> >> >> > On the SDK requirements side: the constructing SDK
>     owns the Environment proto completely, so it is in a position to
>     ensure the involved docker images support the necessary features.
>     >>>> >> >>
>     >>>> >> >> Yes.
>     >>>> >
>     >>>> >
>     >>>> > I believe capabilities do exist on a Pipeline and it
>     informs runners about new types of fields to be aware of either
>     within Components or on the Pipeline object itself but for this
>     discussion it makes sense that an environment would store most
>     "capabilities" related to execution.
>     >>>> >
>     >>>> >> [snip]
>     >>>> >
>     >>>> > As for the proto clean-ups, the scope is to cover almost
>     all things needed for execution now and to follow-up with optional
>     transforms, payloads, and coders later which would exclude job
>     managment APIs and artifact staging. A formal enumeration would be
>     useful here. Also, we should provide formal guidance about adding
>     new fields, adding new types of transforms, new types of proto
>     messages, ... (best to describe this on a case by case basis as to
>     how people are trying to modify the protos and evolve this
>     guidance over time).
>     >>>>
>     >>>> What we need is the ability for (1) runners to reject future
>     pipelines
>     >>>> they cannot faithfully execute and (2) runners to be able to take
>     >>>> advantage of advanced features/protocols when interacting
>     with those
>     >>>> SDKs that understand them while avoiding them for older (or
>     newer)
>     >>>> SDKs that don't. Let's call (1) (hard) requirements and (2)
>     (optional)
>     >>>> capabilities.
>     >>>>
>     >>>> Where possible, I think this is best expressed inherently in
>     the set
>     >>>> of transform (and possibly other component) URNs. For
>     example, when an
>     >>>> SDK uses a combine_per_key composite, that's a signal that it
>     >>>> understands the various related combine_* transforms.
>     Similarly, a
>     >>>> pipeline with a test_stream URN would be rejected by
>     pipelines not
>     >>>> recognizing/supporting this primitive. However, this is not
>     always
>     >>>> possible, e.g. for (1) we have the aforementioned boolean
>     flags on
>     >>>> ParDo and for (2) we have features like large iterable and
>     progress
>     >>>> support.
>     >>>>
>     >>>> For (1) we have to enumerate now everywhere a runner must
>     look a far
>     >>>> into the future as we want to remain backwards compatible.
>     This is why
>     >>>> I suggested putting something on the pipeline itself, but we
>     could
>     >>>> (likely in addition) add it to Transform and/or ParDoPayload
>     if we
>     >>>> think that'd be useful now. (Note that a future pipeline-level
>     >>>> requirement could be "inspect (previously non-existent)
>     requirements
>     >>>> field attached to objects of type X.")
>     >>>>
>     >>>> For (2) I think adding a capabilities field to the
>     environment for now
>     >>>> makes the most sense, and as it's optional to inspect them
>     adding it
>     >>>> elsewhere if needed is backwards compatible. (The motivation
>     to do it
>     >>>> now is that there are some capabilities that we'd like to
>     enumerate
>     >>>> now rather than make part of the minimal set of things an SDK
>     must
>     >>>> support.)
>     >>>>
>     >>
>     >> Agree on the separation of requirements from capabilities where
>     requirements is a set of MUST understand while capabilities are a
>     set of MAY understand.
>     >>
>     >>>>
>     >>>> > All in all, I think "capabilities" is about informing a
>     runner about what they should know about and what they are allowed
>     to do. If we go with a list of "capabilities", we could always add
>     a "parameterized capabilities" urn which would tell runners they
>     need to also look at some other field.
>     >>>>
>     >>>> Good point. That lets us keep it as a list for now. (The risk
>     is that
>     >>>> it makes possible the bug of populating parameters without
>     adding the
>     >>>> required notification to the list.)
>     >>>>
>     >>>> > I also believe capabilities should NOT be "inherited". For
>     example if we define capabilities on a ParDoPayload, and on a
>     PTransform and on Environment, then ParDoPayload capabilities
>     shouldn't be copied to PTransform and PTransform specific
>     capabilities shouldn't be copied to the Environment. My reasoning
>     about this is that some "capabilities" can only be scoped to a
>     single ParDoPayload or a single PTransform and wouldn't apply
>     generally everywhere. The best example I could think of is that
>     Environment A supports progress reporting while Environment B
>     doesn't so it wouldn't have made sense to say the "Pipeline"
>     supports progress reporting.
>     >>>> >
>     >>>> > Are capabilities strictly different from "resources"
>     (transform needs python package X) or "execution hints" (e.g.
>     deploy on machines that have GPUs, some generic but mostly runner
>     specific hints)? At first glance I would say yes.
>     >>>>
>     >>>> Agreed.
>

Re: FnAPI proto backwards compatibility

Posted by Robert Burke <ro...@frantil.com>.
+1 to deferring for now. Since they should not be modified after adoption,
it makes sense not to get ahead of ourselves.

On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
> >
> > One thing that doesn't appear to have been suggested yet is we could
> "batch" urns together under a "super urn" so that adding one super urn is
> like adding each of the represented batch of features. This prevents
> needing to send dozens of urns to be individually sent over.
> >
> >
> > The super urns would need to be static after definition to avoid
> mismatched definitions down the road.
> >
> > We collect together urns what is reasonably consider "vX" support, and
> can then increment that later.
> >
> > This would simplify new SDKs, as they can have a goal of initial v1
> support as we define what level of feature support it has, and doesn't
> prevent new capabilities from being added incrementally.
>
> Yes, this is a very good idea. I've also been thinking of certain sets
> of common operations/well known DoFns that often occur on opposite
> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
> are commonly supported that could be grouped under these meta-urns.
>
> Note that these need not be monotonic, for example a current v1 might
> be requiring LengthPrefixCoderV1, but if a more efficient
> LengthPrefixCoderV2 comes along eventually v2 could require that and
> *not* require the old, now rarely used LengthPrefixCoderV1.
>
> Probably makes sense to defer adding such super-urns until we notice a
> set that is commonly used together in practice.
>
> Of course there's still value in SDKs being able to support features
> piecemeal as well, which is the big reason we're avoiding a simple
> monotonically-increasing version number.
>
> > Similarly, certain features sets could stand alone, eg around SQL. It's
> benefitial for optimization reasons if an SDK has native projection and UDF
> support for example, which a runner could take advantage of by avoiding
> extra cross language hops. These could then also be grouped under a SQL
> super urn.
> >
> > This is from the SDK capability side of course, rather than the SDK
> pipeline requirements side.
> >
> > -------
> > Related to that last point, it might be good to nail down early the
> perspective used when discussing these things, as there's a dual between
> "what and SDK can do", and "what the runner will do to a pipeline that the
> SDK can understand" (eg. Combiner lifting, and state backed iterables), as
> well as "what the pipeline requires from the runner" and "what the runner
> is able to do" (eg. Requires sorted input)
> >
> >
> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
> >>
> >>
> >>
> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>
> >>>
> >>>
> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>>
> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
> >>>> >
> >>>> > We can always detect on the runner/SDK side whether there is an
> unknown field[1] within a payload and fail to process it but this is
> painful in two situations:
> >>>> > 1) It doesn't provide for a good error message since you can't say
> what the purpose of the field is. With a capability URN, the runner/SDK
> could say which URN it doesn't understand.
> >>>> > 2) It doesn't allow for the addition of fields which don't impact
> semantics of execution. For example, if the display data feature was being
> developed, a runner could ignore it and still execute the pipeline
> correctly.
> >>>>
> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
> >>>> this well either.
> >>>>
> >>>> > If we think this to be common enough, we can add capabilities list
> to the PTransform so each PTransform can do this and has a natural way of
> being extended for additions which are forwards compatible. The alternative
> to having capabilities on PTransform (and other constructs) is that we
> would have a new URN when the specification of the transform changes. For
> forwards compatible changes, each SDK/runner would map older versions of
> the URN onto the latest and internally treat it as the latest version but
> always downgrade it to the version the other party expects when
> communicating with it. Backwards incompatible changes would always require
> a new URN which capabilities at the PTransform level would not help with.
> >>>>
> >>>> As you point out, stateful+splittable may not be a particularly useful
> >>>> combination, but as another example, we have
> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
> >>>> it requires finalization, stable inputs, and now time sorting. I don't
> >>>> think we should have a new URN for each combination.
> >>>
> >>>
> >>> Agree with this. I don't think stateful, splittable, and "plain" ParDo
> are comparable to these. Each is an entirely different computational
> paradigm: per-element independent processing, per-key-and-window linear
> processing, and per-element-and-restriction splittable processing. Most
> relevant IMO is the nature of the parallelism. If you added state to
> splittable processing, it would still be splittable processing. Just as
> Combine and ParDo can share the SideInput specification, it is easy to
> share relevant sub-structures like state declarations. But it is a fair
> point that the ability to split can be ignored and run as a plain-old
> ParDo. It brings up the question of whether a runner that doesn't know SDF
> is should have to reject it or should be allowed to run poorly.
> >>
> >>
> >> Being splittable means that the SDK could choose to return a
> continuation saying please process the rest of my element in X amount of
> time which would require the runner to inspect certain fields on responses.
> One example would be I don't have many more messages to read from this
> message stream at the moment and another example could be that I detected
> that this filesystem is throttling me or is down and I would like to resume
> processing later.
> >>
> >>>
> >>> It isn't a huge deal. Three different top-level URNS versus three
> different sub-URNs will achieve the same result in the end if we get this
> "capability" thing in place.
> >>>
> >>> Kenn
> >>>
> >>>>
> >>>>
> >>>> >> > I do think that splittable ParDo and stateful ParDo should have
> separate PTransform URNs since they are different paradigms than "vanilla"
> ParDo.
> >>>> >>
> >>>> >> Here I disagree. What about one that is both splittable and
> stateful? Would one have a fourth URN for that? If/when another flavor of
> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in
> particular can be executed as a normal ParDo as long as the output is
> bounded.)
> >>>> >
> >>>> > I agree that you could have stateful and splittable dofns where the
> element is the key and you share state and timers across restrictions. No
> runner is capable of executing this efficiently.
> >>>> >
> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the
> Environment proto completely, so it is in a position to ensure the involved
> docker images support the necessary features.
> >>>> >> >>
> >>>> >> >> Yes.
> >>>> >
> >>>> >
> >>>> > I believe capabilities do exist on a Pipeline and it informs
> runners about new types of fields to be aware of either within Components
> or on the Pipeline object itself but for this discussion it makes sense
> that an environment would store most "capabilities" related to execution.
> >>>> >
> >>>> >> [snip]
> >>>> >
> >>>> > As for the proto clean-ups, the scope is to cover almost all things
> needed for execution now and to follow-up with optional transforms,
> payloads, and coders later which would exclude job managment APIs and
> artifact staging. A formal enumeration would be useful here. Also, we
> should provide formal guidance about adding new fields, adding new types of
> transforms, new types of proto messages, ... (best to describe this on a
> case by case basis as to how people are trying to modify the protos and
> evolve this guidance over time).
> >>>>
> >>>> What we need is the ability for (1) runners to reject future pipelines
> >>>> they cannot faithfully execute and (2) runners to be able to take
> >>>> advantage of advanced features/protocols when interacting with those
> >>>> SDKs that understand them while avoiding them for older (or newer)
> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
> >>>> capabilities.
> >>>>
> >>>> Where possible, I think this is best expressed inherently in the set
> >>>> of transform (and possibly other component) URNs. For example, when an
> >>>> SDK uses a combine_per_key composite, that's a signal that it
> >>>> understands the various related combine_* transforms. Similarly, a
> >>>> pipeline with a test_stream URN would be rejected by pipelines not
> >>>> recognizing/supporting this primitive. However, this is not always
> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
> >>>> ParDo and for (2) we have features like large iterable and progress
> >>>> support.
> >>>>
> >>>> For (1) we have to enumerate now everywhere a runner must look a far
> >>>> into the future as we want to remain backwards compatible. This is why
> >>>> I suggested putting something on the pipeline itself, but we could
> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
> >>>> think that'd be useful now. (Note that a future pipeline-level
> >>>> requirement could be "inspect (previously non-existent) requirements
> >>>> field attached to objects of type X.")
> >>>>
> >>>> For (2) I think adding a capabilities field to the environment for now
> >>>> makes the most sense, and as it's optional to inspect them adding it
> >>>> elsewhere if needed is backwards compatible. (The motivation to do it
> >>>> now is that there are some capabilities that we'd like to enumerate
> >>>> now rather than make part of the minimal set of things an SDK must
> >>>> support.)
> >>>>
> >>
> >> Agree on the separation of requirements from capabilities where
> requirements is a set of MUST understand while capabilities are a set of
> MAY understand.
> >>
> >>>>
> >>>> > All in all, I think "capabilities" is about informing a runner
> about what they should know about and what they are allowed to do. If we go
> with a list of "capabilities", we could always add a "parameterized
> capabilities" urn which would tell runners they need to also look at some
> other field.
> >>>>
> >>>> Good point. That lets us keep it as a list for now. (The risk is that
> >>>> it makes possible the bug of populating parameters without adding the
> >>>> required notification to the list.)
> >>>>
> >>>> > I also believe capabilities should NOT be "inherited". For example
> if we define capabilities on a ParDoPayload, and on a PTransform and on
> Environment, then ParDoPayload capabilities shouldn't be copied to
> PTransform and PTransform specific capabilities shouldn't be copied to the
> Environment. My reasoning about this is that some "capabilities" can only
> be scoped to a single ParDoPayload or a single PTransform and wouldn't
> apply generally everywhere. The best example I could think of is that
> Environment A supports progress reporting while Environment B doesn't so it
> wouldn't have made sense to say the "Pipeline" supports progress reporting.
> >>>> >
> >>>> > Are capabilities strictly different from "resources" (transform
> needs python package X) or "execution hints" (e.g. deploy on machines that
> have GPUs, some generic but mostly runner specific hints)? At first glance
> I would say yes.
> >>>>
> >>>> Agreed.
>

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <ro...@frantil.com> wrote:
>
> One thing that doesn't appear to have been suggested yet is we could "batch" urns together under a "super urn" so that adding one super urn is like adding each of the represented batch of features. This prevents needing to send dozens of urns to be individually sent over.
>
>
> The super urns would need to be static after definition to avoid mismatched definitions down the road.
>
> We collect together urns what is reasonably consider "vX" support, and can then increment that later.
>
> This would simplify new SDKs, as they can have a goal of initial v1 support as we define what level of feature support it has, and doesn't prevent new capabilities from being added incrementally.

Yes, this is a very good idea. I've also been thinking of certain sets
of common operations/well known DoFns that often occur on opposite
sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
are commonly supported that could be grouped under these meta-urns.

Note that these need not be monotonic, for example a current v1 might
be requiring LengthPrefixCoderV1, but if a more efficient
LengthPrefixCoderV2 comes along eventually v2 could require that and
*not* require the old, now rarely used LengthPrefixCoderV1.

Probably makes sense to defer adding such super-urns until we notice a
set that is commonly used together in practice.

Of course there's still value in SDKs being able to support features
piecemeal as well, which is the big reason we're avoiding a simple
monotonically-increasing version number.

> Similarly, certain features sets could stand alone, eg around SQL. It's benefitial for optimization reasons if an SDK has native projection and UDF support for example, which a runner could take advantage of by avoiding extra cross language hops. These could then also be grouped under a SQL super urn.
>
> This is from the SDK capability side of course, rather than the SDK pipeline requirements side.
>
> -------
> Related to that last point, it might be good to nail down early the perspective used when discussing these things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)
>
>
> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>
>>
>>
>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>
>>>
>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>
>>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>>> >
>>>> > We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
>>>> > 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
>>>> > 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.
>>>>
>>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>> this well either.
>>>>
>>>> > If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.
>>>>
>>>> As you point out, stateful+splittable may not be a particularly useful
>>>> combination, but as another example, we have
>>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>>> it requires finalization, stable inputs, and now time sorting. I don't
>>>> think we should have a new URN for each combination.
>>>
>>>
>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are comparable to these. Each is an entirely different computational paradigm: per-element independent processing, per-key-and-window linear processing, and per-element-and-restriction splittable processing. Most relevant IMO is the nature of the parallelism. If you added state to splittable processing, it would still be splittable processing. Just as Combine and ParDo can share the SideInput specification, it is easy to share relevant sub-structures like state declarations. But it is a fair point that the ability to split can be ignored and run as a plain-old ParDo. It brings up the question of whether a runner that doesn't know SDF is should have to reject it or should be allowed to run poorly.
>>
>>
>> Being splittable means that the SDK could choose to return a continuation saying please process the rest of my element in X amount of time which would require the runner to inspect certain fields on responses. One example would be I don't have many more messages to read from this message stream at the moment and another example could be that I detected that this filesystem is throttling me or is down and I would like to resume processing later.
>>
>>>
>>> It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will achieve the same result in the end if we get this "capability" thing in place.
>>>
>>> Kenn
>>>
>>>>
>>>>
>>>> >> > I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>> >>
>>>> >> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>>>> >
>>>> > I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>>>> >
>>>> >> >> > On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>>>> >> >>
>>>> >> >> Yes.
>>>> >
>>>> >
>>>> > I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>>>> >
>>>> >> [snip]
>>>> >
>>>> > As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).
>>>>
>>>> What we need is the ability for (1) runners to reject future pipelines
>>>> they cannot faithfully execute and (2) runners to be able to take
>>>> advantage of advanced features/protocols when interacting with those
>>>> SDKs that understand them while avoiding them for older (or newer)
>>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>>> capabilities.
>>>>
>>>> Where possible, I think this is best expressed inherently in the set
>>>> of transform (and possibly other component) URNs. For example, when an
>>>> SDK uses a combine_per_key composite, that's a signal that it
>>>> understands the various related combine_* transforms. Similarly, a
>>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>> recognizing/supporting this primitive. However, this is not always
>>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>> ParDo and for (2) we have features like large iterable and progress
>>>> support.
>>>>
>>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>> into the future as we want to remain backwards compatible. This is why
>>>> I suggested putting something on the pipeline itself, but we could
>>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>> think that'd be useful now. (Note that a future pipeline-level
>>>> requirement could be "inspect (previously non-existent) requirements
>>>> field attached to objects of type X.")
>>>>
>>>> For (2) I think adding a capabilities field to the environment for now
>>>> makes the most sense, and as it's optional to inspect them adding it
>>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>> now is that there are some capabilities that we'd like to enumerate
>>>> now rather than make part of the minimal set of things an SDK must
>>>> support.)
>>>>
>>
>> Agree on the separation of requirements from capabilities where requirements is a set of MUST understand while capabilities are a set of MAY understand.
>>
>>>>
>>>> > All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.
>>>>
>>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>> it makes possible the bug of populating parameters without adding the
>>>> required notification to the list.)
>>>>
>>>> > I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>> >
>>>> > Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.
>>>>
>>>> Agreed.

Re: FnAPI proto backwards compatibility

Posted by Robert Burke <ro...@frantil.com>.
One thing that doesn't appear to have been suggested yet is we could
"batch" urns together under a "super urn" so that adding one super urn is
like adding each of the represented batch of features. This prevents
needing to send dozens of urns to be individually sent over.


The super urns would need to be static after definition to avoid mismatched
definitions down the road.

We collect together urns what is reasonably consider "vX" support, and can
then increment that later.

This would simplify new SDKs, as they can have a goal of initial v1 support
as we define what level of feature support it has, and doesn't prevent new
capabilities from being added incrementally.

Similarly, certain features sets could stand alone, eg around SQL. It's
benefitial for optimization reasons if an SDK has native projection and UDF
support for example, which a runner could take advantage of by avoiding
extra cross language hops. These could then also be grouped under a SQL
super urn.

This is from the SDK capability side of course, rather than the SDK
pipeline requirements side.

-------
Related to that last point, it might be good to nail down early the
perspective used when discussing these things, as there's a dual between
"what and SDK can do", and "what the runner will do to a pipeline that the
SDK can understand" (eg. Combiner lifting, and state backed iterables), as
well as "what the pipeline requires from the runner" and "what the runner
is able to do" (eg. Requires sorted input)


On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:

>
>
> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>>> >
>>> > We can always detect on the runner/SDK side whether there is an
>>> unknown field[1] within a payload and fail to process it but this is
>>> painful in two situations:
>>> > 1) It doesn't provide for a good error message since you can't say
>>> what the purpose of the field is. With a capability URN, the runner/SDK
>>> could say which URN it doesn't understand.
>>> > 2) It doesn't allow for the addition of fields which don't impact
>>> semantics of execution. For example, if the display data feature was being
>>> developed, a runner could ignore it and still execute the pipeline
>>> correctly.
>>>
>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>> this well either.
>>>
>>> > If we think this to be common enough, we can add capabilities list to
>>> the PTransform so each PTransform can do this and has a natural way of
>>> being extended for additions which are forwards compatible. The alternative
>>> to having capabilities on PTransform (and other constructs) is that we
>>> would have a new URN when the specification of the transform changes. For
>>> forwards compatible changes, each SDK/runner would map older versions of
>>> the URN onto the latest and internally treat it as the latest version but
>>> always downgrade it to the version the other party expects when
>>> communicating with it. Backwards incompatible changes would always require
>>> a new URN which capabilities at the PTransform level would not help with.
>>>
>>> As you point out, stateful+splittable may not be a particularly useful
>>> combination, but as another example, we have
>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>> it requires finalization, stable inputs, and now time sorting. I don't
>>> think we should have a new URN for each combination.
>>>
>>
>> Agree with this. I don't think stateful, splittable, and "plain" ParDo
>> are comparable to these. Each is an entirely different computational
>> paradigm: per-element independent processing, per-key-and-window linear
>> processing, and per-element-and-restriction splittable processing. Most
>> relevant IMO is the nature of the parallelism. If you added state to
>> splittable processing, it would still be splittable processing. Just as
>> Combine and ParDo can share the SideInput specification, it is easy to
>> share relevant sub-structures like state declarations. But it is a fair
>> point that the ability to split can be ignored and run as a plain-old
>> ParDo. It brings up the question of whether a runner that doesn't know SDF
>> is should have to reject it or should be allowed to run poorly.
>>
>
> Being splittable means that the SDK could choose to return a continuation
> saying please process the rest of my element in X amount of time which
> would require the runner to inspect certain fields on responses. One
> example would be I don't have many more messages to read from this message
> stream at the moment and another example could be that I detected that this
> filesystem is throttling me or is down and I would like to resume
> processing later.
>
>
>> It isn't a huge deal. Three different top-level URNS versus three
>> different sub-URNs will achieve the same result in the end if we get this
>> "capability" thing in place.
>>
>> Kenn
>>
>>
>>>
>>> >> > I do think that splittable ParDo and stateful ParDo should have
>>> separate PTransform URNs since they are different paradigms than "vanilla"
>>> ParDo.
>>> >>
>>> >> Here I disagree. What about one that is both splittable and stateful?
>>> Would one have a fourth URN for that? If/when another flavor of DoFn comes
>>> out, would we then want 8 distinct URNs? (SplitableParDo in particular can
>>> be executed as a normal ParDo as long as the output is bounded.)
>>> >
>>> > I agree that you could have stateful and splittable dofns where the
>>> element is the key and you share state and timers across restrictions. No
>>> runner is capable of executing this efficiently.
>>> >
>>> >> >> > On the SDK requirements side: the constructing SDK owns the
>>> Environment proto completely, so it is in a position to ensure the involved
>>> docker images support the necessary features.
>>> >> >>
>>> >> >> Yes.
>>> >
>>> >
>>> > I believe capabilities do exist on a Pipeline and it informs runners
>>> about new types of fields to be aware of either within Components or on the
>>> Pipeline object itself but for this discussion it makes sense that an
>>> environment would store most "capabilities" related to execution.
>>> >
>>> >> [snip]
>>> >
>>> > As for the proto clean-ups, the scope is to cover almost all things
>>> needed for execution now and to follow-up with optional transforms,
>>> payloads, and coders later which would exclude job managment APIs and
>>> artifact staging. A formal enumeration would be useful here. Also, we
>>> should provide formal guidance about adding new fields, adding new types of
>>> transforms, new types of proto messages, ... (best to describe this on a
>>> case by case basis as to how people are trying to modify the protos and
>>> evolve this guidance over time).
>>>
>>> What we need is the ability for (1) runners to reject future pipelines
>>> they cannot faithfully execute and (2) runners to be able to take
>>> advantage of advanced features/protocols when interacting with those
>>> SDKs that understand them while avoiding them for older (or newer)
>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>> capabilities.
>>>
>>> Where possible, I think this is best expressed inherently in the set
>>> of transform (and possibly other component) URNs. For example, when an
>>> SDK uses a combine_per_key composite, that's a signal that it
>>> understands the various related combine_* transforms. Similarly, a
>>> pipeline with a test_stream URN would be rejected by pipelines not
>>> recognizing/supporting this primitive. However, this is not always
>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>> ParDo and for (2) we have features like large iterable and progress
>>> support.
>>>
>>> For (1) we have to enumerate now everywhere a runner must look a far
>>> into the future as we want to remain backwards compatible. This is why
>>> I suggested putting something on the pipeline itself, but we could
>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>> think that'd be useful now. (Note that a future pipeline-level
>>> requirement could be "inspect (previously non-existent) requirements
>>> field attached to objects of type X.")
>>>
>>> For (2) I think adding a capabilities field to the environment for now
>>> makes the most sense, and as it's optional to inspect them adding it
>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>> now is that there are some capabilities that we'd like to enumerate
>>> now rather than make part of the minimal set of things an SDK must
>>> support.)
>>>
>>>
> Agree on the separation of requirements from capabilities where
> requirements is a set of MUST understand while capabilities are a set of
> MAY understand.
>
>
>> > All in all, I think "capabilities" is about informing a runner about
>>> what they should know about and what they are allowed to do. If we go with
>>> a list of "capabilities", we could always add a "parameterized
>>> capabilities" urn which would tell runners they need to also look at some
>>> other field.
>>>
>>> Good point. That lets us keep it as a list for now. (The risk is that
>>> it makes possible the bug of populating parameters without adding the
>>> required notification to the list.)
>>>
>>> > I also believe capabilities should NOT be "inherited". For example if
>>> we define capabilities on a ParDoPayload, and on a PTransform and on
>>> Environment, then ParDoPayload capabilities shouldn't be copied to
>>> PTransform and PTransform specific capabilities shouldn't be copied to the
>>> Environment. My reasoning about this is that some "capabilities" can only
>>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>>> apply generally everywhere. The best example I could think of is that
>>> Environment A supports progress reporting while Environment B doesn't so it
>>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>> >
>>> > Are capabilities strictly different from "resources" (transform needs
>>> python package X) or "execution hints" (e.g. deploy on machines that have
>>> GPUs, some generic but mostly runner specific hints)? At first glance I
>>> would say yes.
>>>
>>> Agreed.
>>>
>>

Re: FnAPI proto backwards compatibility

Posted by Luke Cwik <lc...@google.com>.
On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>> >
>> > We can always detect on the runner/SDK side whether there is an unknown
>> field[1] within a payload and fail to process it but this is painful in two
>> situations:
>> > 1) It doesn't provide for a good error message since you can't say what
>> the purpose of the field is. With a capability URN, the runner/SDK could
>> say which URN it doesn't understand.
>> > 2) It doesn't allow for the addition of fields which don't impact
>> semantics of execution. For example, if the display data feature was being
>> developed, a runner could ignore it and still execute the pipeline
>> correctly.
>>
>> Yeah, I don't think proto reflection is a flexible enough tool to do
>> this well either.
>>
>> > If we think this to be common enough, we can add capabilities list to
>> the PTransform so each PTransform can do this and has a natural way of
>> being extended for additions which are forwards compatible. The alternative
>> to having capabilities on PTransform (and other constructs) is that we
>> would have a new URN when the specification of the transform changes. For
>> forwards compatible changes, each SDK/runner would map older versions of
>> the URN onto the latest and internally treat it as the latest version but
>> always downgrade it to the version the other party expects when
>> communicating with it. Backwards incompatible changes would always require
>> a new URN which capabilities at the PTransform level would not help with.
>>
>> As you point out, stateful+splittable may not be a particularly useful
>> combination, but as another example, we have
>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>> it requires finalization, stable inputs, and now time sorting. I don't
>> think we should have a new URN for each combination.
>>
>
> Agree with this. I don't think stateful, splittable, and "plain" ParDo are
> comparable to these. Each is an entirely different computational paradigm:
> per-element independent processing, per-key-and-window linear processing,
> and per-element-and-restriction splittable processing. Most relevant IMO is
> the nature of the parallelism. If you added state to splittable processing,
> it would still be splittable processing. Just as Combine and ParDo can
> share the SideInput specification, it is easy to share relevant
> sub-structures like state declarations. But it is a fair point that the
> ability to split can be ignored and run as a plain-old ParDo. It brings up
> the question of whether a runner that doesn't know SDF is should have to
> reject it or should be allowed to run poorly.
>

Being splittable means that the SDK could choose to return a continuation
saying please process the rest of my element in X amount of time which
would require the runner to inspect certain fields on responses. One
example would be I don't have many more messages to read from this message
stream at the moment and another example could be that I detected that this
filesystem is throttling me or is down and I would like to resume
processing later.


> It isn't a huge deal. Three different top-level URNS versus three
> different sub-URNs will achieve the same result in the end if we get this
> "capability" thing in place.
>
> Kenn
>
>
>>
>> >> > I do think that splittable ParDo and stateful ParDo should have
>> separate PTransform URNs since they are different paradigms than "vanilla"
>> ParDo.
>> >>
>> >> Here I disagree. What about one that is both splittable and stateful?
>> Would one have a fourth URN for that? If/when another flavor of DoFn comes
>> out, would we then want 8 distinct URNs? (SplitableParDo in particular can
>> be executed as a normal ParDo as long as the output is bounded.)
>> >
>> > I agree that you could have stateful and splittable dofns where the
>> element is the key and you share state and timers across restrictions. No
>> runner is capable of executing this efficiently.
>> >
>> >> >> > On the SDK requirements side: the constructing SDK owns the
>> Environment proto completely, so it is in a position to ensure the involved
>> docker images support the necessary features.
>> >> >>
>> >> >> Yes.
>> >
>> >
>> > I believe capabilities do exist on a Pipeline and it informs runners
>> about new types of fields to be aware of either within Components or on the
>> Pipeline object itself but for this discussion it makes sense that an
>> environment would store most "capabilities" related to execution.
>> >
>> >> [snip]
>> >
>> > As for the proto clean-ups, the scope is to cover almost all things
>> needed for execution now and to follow-up with optional transforms,
>> payloads, and coders later which would exclude job managment APIs and
>> artifact staging. A formal enumeration would be useful here. Also, we
>> should provide formal guidance about adding new fields, adding new types of
>> transforms, new types of proto messages, ... (best to describe this on a
>> case by case basis as to how people are trying to modify the protos and
>> evolve this guidance over time).
>>
>> What we need is the ability for (1) runners to reject future pipelines
>> they cannot faithfully execute and (2) runners to be able to take
>> advantage of advanced features/protocols when interacting with those
>> SDKs that understand them while avoiding them for older (or newer)
>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>> capabilities.
>>
>> Where possible, I think this is best expressed inherently in the set
>> of transform (and possibly other component) URNs. For example, when an
>> SDK uses a combine_per_key composite, that's a signal that it
>> understands the various related combine_* transforms. Similarly, a
>> pipeline with a test_stream URN would be rejected by pipelines not
>> recognizing/supporting this primitive. However, this is not always
>> possible, e.g. for (1) we have the aforementioned boolean flags on
>> ParDo and for (2) we have features like large iterable and progress
>> support.
>>
>> For (1) we have to enumerate now everywhere a runner must look a far
>> into the future as we want to remain backwards compatible. This is why
>> I suggested putting something on the pipeline itself, but we could
>> (likely in addition) add it to Transform and/or ParDoPayload if we
>> think that'd be useful now. (Note that a future pipeline-level
>> requirement could be "inspect (previously non-existent) requirements
>> field attached to objects of type X.")
>>
>> For (2) I think adding a capabilities field to the environment for now
>> makes the most sense, and as it's optional to inspect them adding it
>> elsewhere if needed is backwards compatible. (The motivation to do it
>> now is that there are some capabilities that we'd like to enumerate
>> now rather than make part of the minimal set of things an SDK must
>> support.)
>>
>>
Agree on the separation of requirements from capabilities where
requirements is a set of MUST understand while capabilities are a set of
MAY understand.


> > All in all, I think "capabilities" is about informing a runner about
>> what they should know about and what they are allowed to do. If we go with
>> a list of "capabilities", we could always add a "parameterized
>> capabilities" urn which would tell runners they need to also look at some
>> other field.
>>
>> Good point. That lets us keep it as a list for now. (The risk is that
>> it makes possible the bug of populating parameters without adding the
>> required notification to the list.)
>>
>> > I also believe capabilities should NOT be "inherited". For example if
>> we define capabilities on a ParDoPayload, and on a PTransform and on
>> Environment, then ParDoPayload capabilities shouldn't be copied to
>> PTransform and PTransform specific capabilities shouldn't be copied to the
>> Environment. My reasoning about this is that some "capabilities" can only
>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>> apply generally everywhere. The best example I could think of is that
>> Environment A supports progress reporting while Environment B doesn't so it
>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>> >
>> > Are capabilities strictly different from "resources" (transform needs
>> python package X) or "execution hints" (e.g. deploy on machines that have
>> GPUs, some generic but mostly runner specific hints)? At first glance I
>> would say yes.
>>
>> Agreed.
>>
>

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
> >
> > We can always detect on the runner/SDK side whether there is an unknown
> field[1] within a payload and fail to process it but this is painful in two
> situations:
> > 1) It doesn't provide for a good error message since you can't say what
> the purpose of the field is. With a capability URN, the runner/SDK could
> say which URN it doesn't understand.
> > 2) It doesn't allow for the addition of fields which don't impact
> semantics of execution. For example, if the display data feature was being
> developed, a runner could ignore it and still execute the pipeline
> correctly.
>
> Yeah, I don't think proto reflection is a flexible enough tool to do
> this well either.
>
> > If we think this to be common enough, we can add capabilities list to
> the PTransform so each PTransform can do this and has a natural way of
> being extended for additions which are forwards compatible. The alternative
> to having capabilities on PTransform (and other constructs) is that we
> would have a new URN when the specification of the transform changes. For
> forwards compatible changes, each SDK/runner would map older versions of
> the URN onto the latest and internally treat it as the latest version but
> always downgrade it to the version the other party expects when
> communicating with it. Backwards incompatible changes would always require
> a new URN which capabilities at the PTransform level would not help with.
>
> As you point out, stateful+splittable may not be a particularly useful
> combination, but as another example, we have
> (backwards-incompatible-when-introduced) markers on DoFn as to whether
> it requires finalization, stable inputs, and now time sorting. I don't
> think we should have a new URN for each combination.
>

Agree with this. I don't think stateful, splittable, and "plain" ParDo are
comparable to these. Each is an entirely different computational paradigm:
per-element independent processing, per-key-and-window linear processing,
and per-element-and-restriction splittable processing. Most relevant IMO is
the nature of the parallelism. If you added state to splittable processing,
it would still be splittable processing. Just as Combine and ParDo can
share the SideInput specification, it is easy to share relevant
sub-structures like state declarations. But it is a fair point that the
ability to split can be ignored and run as a plain-old ParDo. It brings up
the question of whether a runner that doesn't know SDF is should have to
reject it or should be allowed to run poorly.

It isn't a huge deal. Three different top-level URNS versus three different
sub-URNs will achieve the same result in the end if we get this
"capability" thing in place.

Kenn


>
> >> > I do think that splittable ParDo and stateful ParDo should have
> separate PTransform URNs since they are different paradigms than "vanilla"
> ParDo.
> >>
> >> Here I disagree. What about one that is both splittable and stateful?
> Would one have a fourth URN for that? If/when another flavor of DoFn comes
> out, would we then want 8 distinct URNs? (SplitableParDo in particular can
> be executed as a normal ParDo as long as the output is bounded.)
> >
> > I agree that you could have stateful and splittable dofns where the
> element is the key and you share state and timers across restrictions. No
> runner is capable of executing this efficiently.
> >
> >> >> > On the SDK requirements side: the constructing SDK owns the
> Environment proto completely, so it is in a position to ensure the involved
> docker images support the necessary features.
> >> >>
> >> >> Yes.
> >
> >
> > I believe capabilities do exist on a Pipeline and it informs runners
> about new types of fields to be aware of either within Components or on the
> Pipeline object itself but for this discussion it makes sense that an
> environment would store most "capabilities" related to execution.
> >
> >> [snip]
> >
> > As for the proto clean-ups, the scope is to cover almost all things
> needed for execution now and to follow-up with optional transforms,
> payloads, and coders later which would exclude job managment APIs and
> artifact staging. A formal enumeration would be useful here. Also, we
> should provide formal guidance about adding new fields, adding new types of
> transforms, new types of proto messages, ... (best to describe this on a
> case by case basis as to how people are trying to modify the protos and
> evolve this guidance over time).
>
> What we need is the ability for (1) runners to reject future pipelines
> they cannot faithfully execute and (2) runners to be able to take
> advantage of advanced features/protocols when interacting with those
> SDKs that understand them while avoiding them for older (or newer)
> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
> capabilities.
>
> Where possible, I think this is best expressed inherently in the set
> of transform (and possibly other component) URNs. For example, when an
> SDK uses a combine_per_key composite, that's a signal that it
> understands the various related combine_* transforms. Similarly, a
> pipeline with a test_stream URN would be rejected by pipelines not
> recognizing/supporting this primitive. However, this is not always
> possible, e.g. for (1) we have the aforementioned boolean flags on
> ParDo and for (2) we have features like large iterable and progress
> support.
>
> For (1) we have to enumerate now everywhere a runner must look a far
> into the future as we want to remain backwards compatible. This is why
> I suggested putting something on the pipeline itself, but we could
> (likely in addition) add it to Transform and/or ParDoPayload if we
> think that'd be useful now. (Note that a future pipeline-level
> requirement could be "inspect (previously non-existent) requirements
> field attached to objects of type X.")
>
> For (2) I think adding a capabilities field to the environment for now
> makes the most sense, and as it's optional to inspect them adding it
> elsewhere if needed is backwards compatible. (The motivation to do it
> now is that there are some capabilities that we'd like to enumerate
> now rather than make part of the minimal set of things an SDK must
> support.)
>
> > All in all, I think "capabilities" is about informing a runner about
> what they should know about and what they are allowed to do. If we go with
> a list of "capabilities", we could always add a "parameterized
> capabilities" urn which would tell runners they need to also look at some
> other field.
>
> Good point. That lets us keep it as a list for now. (The risk is that
> it makes possible the bug of populating parameters without adding the
> required notification to the list.)
>
> > I also believe capabilities should NOT be "inherited". For example if we
> define capabilities on a ParDoPayload, and on a PTransform and on
> Environment, then ParDoPayload capabilities shouldn't be copied to
> PTransform and PTransform specific capabilities shouldn't be copied to the
> Environment. My reasoning about this is that some "capabilities" can only
> be scoped to a single ParDoPayload or a single PTransform and wouldn't
> apply generally everywhere. The best example I could think of is that
> Environment A supports progress reporting while Environment B doesn't so it
> wouldn't have made sense to say the "Pipeline" supports progress reporting.
> >
> > Are capabilities strictly different from "resources" (transform needs
> python package X) or "execution hints" (e.g. deploy on machines that have
> GPUs, some generic but mostly runner specific hints)? At first glance I
> would say yes.
>
> Agreed.
>

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
>
> We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations:
> 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand.
> 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly.

Yeah, I don't think proto reflection is a flexible enough tool to do
this well either.

> If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with.

As you point out, stateful+splittable may not be a particularly useful
combination, but as another example, we have
(backwards-incompatible-when-introduced) markers on DoFn as to whether
it requires finalization, stable inputs, and now time sorting. I don't
think we should have a new URN for each combination.

>> > I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>
>> Here I disagree. What about one that is both splittable and stateful? Would one have a fourth URN for that? If/when another flavor of DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in particular can be executed as a normal ParDo as long as the output is bounded.)
>
> I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently.
>
>> >> > On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.
>> >>
>> >> Yes.
>
>
> I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution.
>
>> [snip]
>
> As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time).

What we need is the ability for (1) runners to reject future pipelines
they cannot faithfully execute and (2) runners to be able to take
advantage of advanced features/protocols when interacting with those
SDKs that understand them while avoiding them for older (or newer)
SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
capabilities.

Where possible, I think this is best expressed inherently in the set
of transform (and possibly other component) URNs. For example, when an
SDK uses a combine_per_key composite, that's a signal that it
understands the various related combine_* transforms. Similarly, a
pipeline with a test_stream URN would be rejected by pipelines not
recognizing/supporting this primitive. However, this is not always
possible, e.g. for (1) we have the aforementioned boolean flags on
ParDo and for (2) we have features like large iterable and progress
support.

For (1) we have to enumerate now everywhere a runner must look a far
into the future as we want to remain backwards compatible. This is why
I suggested putting something on the pipeline itself, but we could
(likely in addition) add it to Transform and/or ParDoPayload if we
think that'd be useful now. (Note that a future pipeline-level
requirement could be "inspect (previously non-existent) requirements
field attached to objects of type X.")

For (2) I think adding a capabilities field to the environment for now
makes the most sense, and as it's optional to inspect them adding it
elsewhere if needed is backwards compatible. (The motivation to do it
now is that there are some capabilities that we'd like to enumerate
now rather than make part of the minimal set of things an SDK must
support.)

> All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field.

Good point. That lets us keep it as a list for now. (The risk is that
it makes possible the bug of populating parameters without adding the
required notification to the list.)

> I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting.
>
> Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes.

Agreed.

Re: FnAPI proto backwards compatibility

Posted by Luke Cwik <lc...@google.com>.
On Wed, Feb 12, 2020 at 7:57 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Feb 11, 2020 at 7:25 PM Kenneth Knowles <ke...@apache.org> wrote:
> >
> > On Tue, Feb 11, 2020 at 8:38 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Mon, Feb 10, 2020 at 7:35 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >> >
> >> > On the runner requirements side: if you have such a list at the
> pipeline level, it is an opportunity for the list to be inconsistent with
> the contents of the pipeline. For example, if a DoFn is marked "requires
> stable input" but not listed at the pipeline level, then the runner may run
> it without ensuring it requires stable input.
> >>
> >> Yes. Listing this feature at the top level, if used, would be part of
> >> the contract. The problem here that we're trying to solve is that the
> >> runner wouldn't know about the field used to mark a DoFn as "requires
> >> stable input." Another alternative would be to make this kind of ParDo
> >> a different URN, but that would result in a cross product of URNs for
> >> all supported features.
> >
> >
> >>
> >> Rather than attaching it to the pipeline object, we could attach it to
> >> the transform. (But if there are ever extensions that don't belong to
> >> transforms, we'd be out of luck. It'd be even worse to attach it to
> >> the ParDoPayload, as then we'd need one on CombinePayload, etc. just
> >> in case.) This is why I was leaning towards just putting it at the
> >> top.
> >>
> >> I agree about the potential for incompatibility. As much as possible
> >> I'd rather extend things in a way that would be intrinsically rejected
> >> by a non-comprehending runner. But I'm not sure how to do that when
> >> introducing new constraints for existing components like this. But I'm
> >> open to other suggestions.
> >
> >
> > I was waiting for Luke to mention something he suggested offline: that
> we make this set of fields a list of URNs and require a runner to fail if
> there are any that it does not understand. That should do it for
> DoFn-granularity features. It makes sense - proto is designed to
> ignore/propagate unknown bits. We want to fail on unknown bits.
>
> I agree this would be superior for bools like requires_time_sorted_input
> and requests_finalization. Would it be worth making this a map for those
> features that have attached data such that it could not be forgotten? (E.g.
> rather than state_specs being a top-level field, it would be a value for
> the requires-state URN.) Should we move to this pattern for existing
> requirements (like the aforementioned state) or just future ones? Was the
> parameters field an attempt in this direction?
>
> I still think we need something top-level lest we not be able to modify
> anything but ParDo, but putting it on ParDo as well could be natural.
>

We can always detect on the runner/SDK side whether there is an unknown
field[1] within a payload and fail to process it but this is painful in two
situations:
1) It doesn't provide for a good error message since you can't say what the
purpose of the field is. With a capability URN, the runner/SDK could say
which URN it doesn't understand.
2) It doesn't allow for the addition of fields which don't impact semantics
of execution. For example, if the display data feature was being developed,
a runner could ignore it and still execute the pipeline correctly.

If we think this to be common enough, we can add capabilities list to the
PTransform so each PTransform can do this and has a natural way of being
extended for additions which are forwards compatible. The alternative to
having capabilities on PTransform (and other constructs) is that we would
have a new URN when the specification of the transform changes. For
forwards compatible changes, each SDK/runner would map older versions of
the URN onto the latest and internally treat it as the latest version but
always downgrade it to the version the other party expects when
communicating with it. Backwards incompatible changes would always require
a new URN which capabilities at the PTransform level would not help with.

> I do think that splittable ParDo and stateful ParDo should have separate
> PTransform URNs since they are different paradigms than "vanilla" ParDo.
>
> Here I disagree. What about one that is both splittable and stateful?
> Would one have a fourth URN for that? If/when another flavor of DoFn comes
> out, would we then want 8 distinct URNs? (SplitableParDo in particular can
> be executed as a normal ParDo as long as the output is bounded.)
>

I agree that you could have stateful and splittable dofns where the element
is the key and you share state and timers across restrictions. No runner is
capable of executing this efficiently.


> >> > On the SDK requirements side: the constructing SDK owns the
> Environment proto completely, so it is in a position to ensure the involved
> docker images support the necessary features.
> >>
> >> Yes.
>

I believe capabilities do exist on a Pipeline and it informs runners about
new types of fields to be aware of either within Components or on the
Pipeline object itself but for this discussion it makes sense that an
environment would store most "capabilities" related to execution.


> >>
> >> > Is it sufficient for each SDK involved in a cross-language expansion
> to validate that it understands the inputs? For example if Python sends a
> PCollection with a pickle coder to Java as input to an expansion then it
> will fail. And conversely if the returned subgraph outputs a PCollection
> with a Java custom coder.
> >>
> >> Yes. It's possible to imagine there could be some negotiation about
> >> inserting length prefix coders (e.g. a Count transform could act on
> >> any opaque data as long as it can delimit it), but that's still TBD.
>

I would suggest in situations where SDK A knows that Count could be done
opaquely, then it defines Count using a well known type such as "bytes" and
have any other SDK that uses it to perform the transcoding. Not all
languages have the flexibility to pass around an opaque type.


> >>
> >> > More complex use cases that I can imagine all seem futuristic and
> unlikely to come to pass (Python passes a pickled DoFn to the Java
> expansion service which inserts it into the graph in a way where a
> Java-based transform would have to invoke it on every element, etc)
> >>
> >> Some transforms are configured with UDFs of this form...but we'll
> >> cross that bridge when we get to it.
> >
> >
> > Now that I think harder, I know of a TimestampFn that governs the
> watermark. Does SDF solve this by allowing a composite IO where the parsing
> to be done in one language while the watermark is somehow governed by the
> other? And then there's writing a SQL UDF in your language of choice...
> Anyhow, probably a tangent...
>
> Yeah, it'd be good to support this, someday...
>

I think this will be a higher level transform and not something as low
level as a ParDo. Such as a Kafka transform that is parameterized by a
parsing function and watermark function but still very much up for
discussion.


>
> >> > On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <bh...@google.com>
> wrote:
> >> >>
> >> >> I like the capabilities/requirements idea. Would these capabilities
> be at a level that it would make sense to document in the capabilities
> matrix? i.e. could the URNs be the values of "X" Pablo described here [1].
> >> >>
>

Yes and no. Yes is that we would be able to formally enumerate this, and
list what each SDK and each runner supports (at specific versions). No
since being able to do something isn't the same as being able to do
something well so we would still want to have some way to say that a Runner
can do something well.


> >> >> Brian
> >> >>
> >> >> [1]
> https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E
> >> >>
> >> >> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>>
> >> >>> With an eye towards cross-language (which includes cross-version)
> >> >>> pipelines and services (specifically looking at Dataflow) supporting
> >> >>> portable pipelines, there's been a desire to stabilize the
> portability
> >> >>> protos. There are currently many cleanups we'd like to do [1] (some
> >> >>> essential, others nice to have); are there others that people would
> >> >>> like to see?
> >> >>>
> >> >>> Of course we would like it to be possible for the FnAPI and Beam
> >> >>> itself to continue to evolve. Most of this can be handled by runners
> >> >>> understanding various transform URNs, but not all. (An example that
> >> >>> comes to mind is support for large iterables [2], or the requirement
> >> >>> to observe and respect new fields on a PTransform or its payloads
> >> >>> [3]). One proposal for this is to add capabilities and/or
> >> >>> requirements. An environment (corresponding generally to an SDK)
> could
> >> >>> adveritize various capabilities (as a list or map of URNs) which a
> >> >>> runner can take advantage of without requiring all SDKs to support
> all
> >> >>> features at the same time. For the other way around, we need a way
> of
> >> >>> marking something that a runner must reject if it does not
> understand
> >> >>> it. This could be a set of requirements (again, a list of map of
> URNs)
> >> >>> that designate capabilities required to at least be understood by
> the
> >> >>> runner to faithfully execute this pipeline. (These could be attached
> >> >>> to a transform or the pipeline itself.) Do these sound like
> reasonable
> >> >>> additions? Also, would they ever need to be parameterized (map), or
> >> >>> would a list suffice?
> >> >>>
> >> >>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223,
> BEAM-3227,
> >> >>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
> >> >>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
> >> >>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
> >> >>> [2]
> https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
> >> >>> [3]
> https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E
>

As for the proto clean-ups, the scope is to cover almost all things needed
for execution now and to follow-up with optional transforms, payloads, and
coders later which would exclude job managment APIs and artifact staging. A
formal enumeration would be useful here. Also, we should provide formal
guidance about adding new fields, adding new types of transforms, new types
of proto messages, ... (best to describe this on a case by case basis as to
how people are trying to modify the protos and evolve this guidance over
time).

All in all, I think "capabilities" is about informing a runner about what
they should know about and what they are allowed to do. If we go with a
list of "capabilities", we could always add a "parameterized capabilities"
urn which would tell runners they need to also look at some other field. I
also believe capabilities should NOT be "inherited". For example if we
define capabilities on a ParDoPayload, and on a PTransform and on
Environment, then ParDoPayload capabilities shouldn't be copied to
PTransform and PTransform specific capabilities shouldn't be copied to the
Environment. My reasoning about this is that some "capabilities" can only
be scoped to a single ParDoPayload or a single PTransform and wouldn't
apply generally everywhere. The best example I could think of is that
Environment A supports progress reporting while Environment B doesn't so it
wouldn't have made sense to say the "Pipeline" supports progress reporting.

Are capabilities strictly different from "resources" (transform needs
python package X) or "execution hints" (e.g. deploy on machines that have
GPUs, some generic but mostly runner specific hints)? At first glance I
would say yes.

1:
https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/MessageOrBuilder.html#getUnknownFields--

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Feb 11, 2020 at 7:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> On Tue, Feb 11, 2020 at 8:38 AM Robert Bradshaw <ro...@google.com>
wrote:
>>
>> On Mon, Feb 10, 2020 at 7:35 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >
>> > On the runner requirements side: if you have such a list at the
pipeline level, it is an opportunity for the list to be inconsistent with
the contents of the pipeline. For example, if a DoFn is marked "requires
stable input" but not listed at the pipeline level, then the runner may run
it without ensuring it requires stable input.
>>
>> Yes. Listing this feature at the top level, if used, would be part of
>> the contract. The problem here that we're trying to solve is that the
>> runner wouldn't know about the field used to mark a DoFn as "requires
>> stable input." Another alternative would be to make this kind of ParDo
>> a different URN, but that would result in a cross product of URNs for
>> all supported features.
>
>
>>
>> Rather than attaching it to the pipeline object, we could attach it to
>> the transform. (But if there are ever extensions that don't belong to
>> transforms, we'd be out of luck. It'd be even worse to attach it to
>> the ParDoPayload, as then we'd need one on CombinePayload, etc. just
>> in case.) This is why I was leaning towards just putting it at the
>> top.
>>
>> I agree about the potential for incompatibility. As much as possible
>> I'd rather extend things in a way that would be intrinsically rejected
>> by a non-comprehending runner. But I'm not sure how to do that when
>> introducing new constraints for existing components like this. But I'm
>> open to other suggestions.
>
>
> I was waiting for Luke to mention something he suggested offline: that we
make this set of fields a list of URNs and require a runner to fail if
there are any that it does not understand. That should do it for
DoFn-granularity features. It makes sense - proto is designed to
ignore/propagate unknown bits. We want to fail on unknown bits.

I agree this would be superior for bools like requires_time_sorted_input
and requests_finalization. Would it be worth making this a map for those
features that have attached data such that it could not be forgotten? (E.g.
rather than state_specs being a top-level field, it would be a value for
the requires-state URN.) Should we move to this pattern for existing
requirements (like the aforementioned state) or just future ones? Was the
parameters field an attempt in this direction?

I still think we need something top-level lest we not be able to modify
anything but ParDo, but putting it on ParDo as well could be natural.

> I do think that splittable ParDo and stateful ParDo should have separate
PTransform URNs since they are different paradigms than "vanilla" ParDo.

Here I disagree. What about one that is both splittable and stateful? Would
one have a fourth URN for that? If/when another flavor of DoFn comes out,
would we then want 8 distinct URNs? (SplitableParDo in particular can be
executed as a normal ParDo as long as the output is bounded.)

>> > On the SDK requirements side: the constructing SDK owns the
Environment proto completely, so it is in a position to ensure the involved
docker images support the necessary features.
>>
>> Yes.
>>
>> > Is it sufficient for each SDK involved in a cross-language expansion
to validate that it understands the inputs? For example if Python sends a
PCollection with a pickle coder to Java as input to an expansion then it
will fail. And conversely if the returned subgraph outputs a PCollection
with a Java custom coder.
>>
>> Yes. It's possible to imagine there could be some negotiation about
>> inserting length prefix coders (e.g. a Count transform could act on
>> any opaque data as long as it can delimit it), but that's still TBD.
>>
>> > More complex use cases that I can imagine all seem futuristic and
unlikely to come to pass (Python passes a pickled DoFn to the Java
expansion service which inserts it into the graph in a way where a
Java-based transform would have to invoke it on every element, etc)
>>
>> Some transforms are configured with UDFs of this form...but we'll
>> cross that bridge when we get to it.
>
>
> Now that I think harder, I know of a TimestampFn that governs the
watermark. Does SDF solve this by allowing a composite IO where the parsing
to be done in one language while the watermark is somehow governed by the
other? And then there's writing a SQL UDF in your language of choice...
Anyhow, probably a tangent...

Yeah, it'd be good to support this, someday...

>> > On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <bh...@google.com>
wrote:
>> >>
>> >> I like the capabilities/requirements idea. Would these capabilities
be at a level that it would make sense to document in the capabilities
matrix? i.e. could the URNs be the values of "X" Pablo described here [1].
>> >>
>> >> Brian
>> >>
>> >> [1]
https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E
>> >>
>> >> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com>
wrote:
>> >>>
>> >>> With an eye towards cross-language (which includes cross-version)
>> >>> pipelines and services (specifically looking at Dataflow) supporting
>> >>> portable pipelines, there's been a desire to stabilize the
portability
>> >>> protos. There are currently many cleanups we'd like to do [1] (some
>> >>> essential, others nice to have); are there others that people would
>> >>> like to see?
>> >>>
>> >>> Of course we would like it to be possible for the FnAPI and Beam
>> >>> itself to continue to evolve. Most of this can be handled by runners
>> >>> understanding various transform URNs, but not all. (An example that
>> >>> comes to mind is support for large iterables [2], or the requirement
>> >>> to observe and respect new fields on a PTransform or its payloads
>> >>> [3]). One proposal for this is to add capabilities and/or
>> >>> requirements. An environment (corresponding generally to an SDK)
could
>> >>> adveritize various capabilities (as a list or map of URNs) which a
>> >>> runner can take advantage of without requiring all SDKs to support
all
>> >>> features at the same time. For the other way around, we need a way of
>> >>> marking something that a runner must reject if it does not understand
>> >>> it. This could be a set of requirements (again, a list of map of
URNs)
>> >>> that designate capabilities required to at least be understood by the
>> >>> runner to faithfully execute this pipeline. (These could be attached
>> >>> to a transform or the pipeline itself.) Do these sound like
reasonable
>> >>> additions? Also, would they ever need to be parameterized (map), or
>> >>> would a list suffice?
>> >>>
>> >>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
>> >>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
>> >>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
>> >>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
>> >>> [2]
https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
>> >>> [3]
https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
On Tue, Feb 11, 2020 at 8:38 AM Robert Bradshaw <ro...@google.com> wrote:

> On Mon, Feb 10, 2020 at 7:35 PM Kenneth Knowles <ke...@apache.org> wrote:
> >
> > On the runner requirements side: if you have such a list at the pipeline
> level, it is an opportunity for the list to be inconsistent with the
> contents of the pipeline. For example, if a DoFn is marked "requires stable
> input" but not listed at the pipeline level, then the runner may run it
> without ensuring it requires stable input.
>
> Yes. Listing this feature at the top level, if used, would be part of
> the contract. The problem here that we're trying to solve is that the
> runner wouldn't know about the field used to mark a DoFn as "requires
> stable input." Another alternative would be to make this kind of ParDo
> a different URN, but that would result in a cross product of URNs for
> all supported features.



> Rather than attaching it to the pipeline object, we could attach it to
> the transform. (But if there are ever extensions that don't belong to
> transforms, we'd be out of luck. It'd be even worse to attach it to
> the ParDoPayload, as then we'd need one on CombinePayload, etc. just
> in case.) This is why I was leaning towards just putting it at the
> top.
>
> I agree about the potential for incompatibility. As much as possible
> I'd rather extend things in a way that would be intrinsically rejected
> by a non-comprehending runner. But I'm not sure how to do that when
> introducing new constraints for existing components like this. But I'm
> open to other suggestions.
>

I was waiting for Luke to mention something he suggested offline: that we
make this set of fields a list of URNs and require a runner to fail if
there are any that it does not understand. That should do it for
DoFn-granularity features. It makes sense - proto is designed to
ignore/propagate unknown bits. We want to fail on unknown bits.

I do think that splittable ParDo and stateful ParDo should have separate
PTransform URNs since they are different paradigms than "vanilla" ParDo.

> On the SDK requirements side: the constructing SDK owns the Environment
> proto completely, so it is in a position to ensure the involved docker
> images support the necessary features.
>
> Yes.
>
> > Is it sufficient for each SDK involved in a cross-language expansion to
> validate that it understands the inputs? For example if Python sends a
> PCollection with a pickle coder to Java as input to an expansion then it
> will fail. And conversely if the returned subgraph outputs a PCollection
> with a Java custom coder.
>
> Yes. It's possible to imagine there could be some negotiation about
> inserting length prefix coders (e.g. a Count transform could act on
> any opaque data as long as it can delimit it), but that's still TBD.
>
> > More complex use cases that I can imagine all seem futuristic and
> unlikely to come to pass (Python passes a pickled DoFn to the Java
> expansion service which inserts it into the graph in a way where a
> Java-based transform would have to invoke it on every element, etc)
>
> Some transforms are configured with UDFs of this form...but we'll
> cross that bridge when we get to it.
>

Now that I think harder, I know of a TimestampFn that governs the
watermark. Does SDF solve this by allowing a composite IO where the parsing
to be done in one language while the watermark is somehow governed by the
other? And then there's writing a SQL UDF in your language of choice...
Anyhow, probably a tangent...

Kenn


> Kenn
> >
> > On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <bh...@google.com>
> wrote:
> >>
> >> I like the capabilities/requirements idea. Would these capabilities be
> at a level that it would make sense to document in the capabilities matrix?
> i.e. could the URNs be the values of "X" Pablo described here [1].
> >>
> >> Brian
> >>
> >> [1]
> https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E
> >>
> >> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>
> >>> With an eye towards cross-language (which includes cross-version)
> >>> pipelines and services (specifically looking at Dataflow) supporting
> >>> portable pipelines, there's been a desire to stabilize the portability
> >>> protos. There are currently many cleanups we'd like to do [1] (some
> >>> essential, others nice to have); are there others that people would
> >>> like to see?
> >>>
> >>> Of course we would like it to be possible for the FnAPI and Beam
> >>> itself to continue to evolve. Most of this can be handled by runners
> >>> understanding various transform URNs, but not all. (An example that
> >>> comes to mind is support for large iterables [2], or the requirement
> >>> to observe and respect new fields on a PTransform or its payloads
> >>> [3]). One proposal for this is to add capabilities and/or
> >>> requirements. An environment (corresponding generally to an SDK) could
> >>> adveritize various capabilities (as a list or map of URNs) which a
> >>> runner can take advantage of without requiring all SDKs to support all
> >>> features at the same time. For the other way around, we need a way of
> >>> marking something that a runner must reject if it does not understand
> >>> it. This could be a set of requirements (again, a list of map of URNs)
> >>> that designate capabilities required to at least be understood by the
> >>> runner to faithfully execute this pipeline. (These could be attached
> >>> to a transform or the pipeline itself.) Do these sound like reasonable
> >>> additions? Also, would they ever need to be parameterized (map), or
> >>> would a list suffice?
> >>>
> >>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
> >>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
> >>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
> >>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
> >>> [2]
> https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
> >>> [3]
> https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E
>

Re: FnAPI proto backwards compatibility

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Feb 10, 2020 at 7:35 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> On the runner requirements side: if you have such a list at the pipeline level, it is an opportunity for the list to be inconsistent with the contents of the pipeline. For example, if a DoFn is marked "requires stable input" but not listed at the pipeline level, then the runner may run it without ensuring it requires stable input.

Yes. Listing this feature at the top level, if used, would be part of
the contract. The problem here that we're trying to solve is that the
runner wouldn't know about the field used to mark a DoFn as "requires
stable input." Another alternative would be to make this kind of ParDo
a different URN, but that would result in a cross product of URNs for
all supported features.

Rather than attaching it to the pipeline object, we could attach it to
the transform. (But if there are ever extensions that don't belong to
transforms, we'd be out of luck. It'd be even worse to attach it to
the ParDoPayload, as then we'd need one on CombinePayload, etc. just
in case.) This is why I was leaning towards just putting it at the
top.

I agree about the potential for incompatibility. As much as possible
I'd rather extend things in a way that would be intrinsically rejected
by a non-comprehending runner. But I'm not sure how to do that when
introducing new constraints for existing components like this. But I'm
open to other suggestions.

> On the SDK requirements side: the constructing SDK owns the Environment proto completely, so it is in a position to ensure the involved docker images support the necessary features.

Yes.

> Is it sufficient for each SDK involved in a cross-language expansion to validate that it understands the inputs? For example if Python sends a PCollection with a pickle coder to Java as input to an expansion then it will fail. And conversely if the returned subgraph outputs a PCollection with a Java custom coder.

Yes. It's possible to imagine there could be some negotiation about
inserting length prefix coders (e.g. a Count transform could act on
any opaque data as long as it can delimit it), but that's still TBD.

> More complex use cases that I can imagine all seem futuristic and unlikely to come to pass (Python passes a pickled DoFn to the Java expansion service which inserts it into the graph in a way where a Java-based transform would have to invoke it on every element, etc)

Some transforms are configured with UDFs of this form...but we'll
cross that bridge when we get to it.

>
> Kenn
>
> On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <bh...@google.com> wrote:
>>
>> I like the capabilities/requirements idea. Would these capabilities be at a level that it would make sense to document in the capabilities matrix? i.e. could the URNs be the values of "X" Pablo described here [1].
>>
>> Brian
>>
>> [1] https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E
>>
>> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> With an eye towards cross-language (which includes cross-version)
>>> pipelines and services (specifically looking at Dataflow) supporting
>>> portable pipelines, there's been a desire to stabilize the portability
>>> protos. There are currently many cleanups we'd like to do [1] (some
>>> essential, others nice to have); are there others that people would
>>> like to see?
>>>
>>> Of course we would like it to be possible for the FnAPI and Beam
>>> itself to continue to evolve. Most of this can be handled by runners
>>> understanding various transform URNs, but not all. (An example that
>>> comes to mind is support for large iterables [2], or the requirement
>>> to observe and respect new fields on a PTransform or its payloads
>>> [3]). One proposal for this is to add capabilities and/or
>>> requirements. An environment (corresponding generally to an SDK) could
>>> adveritize various capabilities (as a list or map of URNs) which a
>>> runner can take advantage of without requiring all SDKs to support all
>>> features at the same time. For the other way around, we need a way of
>>> marking something that a runner must reject if it does not understand
>>> it. This could be a set of requirements (again, a list of map of URNs)
>>> that designate capabilities required to at least be understood by the
>>> runner to faithfully execute this pipeline. (These could be attached
>>> to a transform or the pipeline itself.) Do these sound like reasonable
>>> additions? Also, would they ever need to be parameterized (map), or
>>> would a list suffice?
>>>
>>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
>>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
>>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
>>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
>>> [2] https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
>>> [3] https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E

Re: FnAPI proto backwards compatibility

Posted by Kenneth Knowles <ke...@apache.org>.
On the runner requirements side: if you have such a list at the pipeline
level, it is an opportunity for the list to be inconsistent with the
contents of the pipeline. For example, if a DoFn is marked "requires stable
input" but not listed at the pipeline level, then the runner may run it
without ensuring it requires stable input.

On the SDK requirements side: the constructing SDK owns the Environment
proto completely, so it is in a position to ensure the involved docker
images support the necessary features. Is it sufficient for each SDK
involved in a cross-language expansion to validate that it understands the
inputs? For example if Python sends a PCollection with a pickle coder to
Java as input to an expansion then it will fail. And conversely if the
returned subgraph outputs a PCollection with a Java custom coder. More
complex use cases that I can imagine all seem futuristic and unlikely to
come to pass (Python passes a pickled DoFn to the Java expansion service
which inserts it into the graph in a way where a Java-based transform would
have to invoke it on every element, etc)

Kenn

On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <bh...@google.com> wrote:

> I like the capabilities/requirements idea. Would these capabilities be at
> a level that it would make sense to document in the capabilities matrix?
> i.e. could the URNs be the values of "X" Pablo described here [1].
>
> Brian
>
> [1]
> https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E
>
> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> With an eye towards cross-language (which includes cross-version)
>> pipelines and services (specifically looking at Dataflow) supporting
>> portable pipelines, there's been a desire to stabilize the portability
>> protos. There are currently many cleanups we'd like to do [1] (some
>> essential, others nice to have); are there others that people would
>> like to see?
>>
>> Of course we would like it to be possible for the FnAPI and Beam
>> itself to continue to evolve. Most of this can be handled by runners
>> understanding various transform URNs, but not all. (An example that
>> comes to mind is support for large iterables [2], or the requirement
>> to observe and respect new fields on a PTransform or its payloads
>> [3]). One proposal for this is to add capabilities and/or
>> requirements. An environment (corresponding generally to an SDK) could
>> adveritize various capabilities (as a list or map of URNs) which a
>> runner can take advantage of without requiring all SDKs to support all
>> features at the same time. For the other way around, we need a way of
>> marking something that a runner must reject if it does not understand
>> it. This could be a set of requirements (again, a list of map of URNs)
>> that designate capabilities required to at least be understood by the
>> runner to faithfully execute this pipeline. (These could be attached
>> to a transform or the pipeline itself.) Do these sound like reasonable
>> additions? Also, would they ever need to be parameterized (map), or
>> would a list suffice?
>>
>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
>> [2]
>> https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
>> [3]
>> https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E
>>
>

Re: FnAPI proto backwards compatibility

Posted by Brian Hulette <bh...@google.com>.
I like the capabilities/requirements idea. Would these capabilities be at a
level that it would make sense to document in the capabilities matrix? i.e.
could the URNs be the values of "X" Pablo described here [1].

Brian

[1]
https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E

On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <ro...@google.com> wrote:

> With an eye towards cross-language (which includes cross-version)
> pipelines and services (specifically looking at Dataflow) supporting
> portable pipelines, there's been a desire to stabilize the portability
> protos. There are currently many cleanups we'd like to do [1] (some
> essential, others nice to have); are there others that people would
> like to see?
>
> Of course we would like it to be possible for the FnAPI and Beam
> itself to continue to evolve. Most of this can be handled by runners
> understanding various transform URNs, but not all. (An example that
> comes to mind is support for large iterables [2], or the requirement
> to observe and respect new fields on a PTransform or its payloads
> [3]). One proposal for this is to add capabilities and/or
> requirements. An environment (corresponding generally to an SDK) could
> adveritize various capabilities (as a list or map of URNs) which a
> runner can take advantage of without requiring all SDKs to support all
> features at the same time. For the other way around, we need a way of
> marking something that a runner must reject if it does not understand
> it. This could be a set of requirements (again, a list of map of URNs)
> that designate capabilities required to at least be understood by the
> runner to faithfully execute this pipeline. (These could be attached
> to a transform or the pipeline itself.) Do these sound like reasonable
> additions? Also, would they ever need to be parameterized (map), or
> would a list suffice?
>
> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, BEAM-3227,
> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374,
> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373,
> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272
> [2]
> https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E
>