You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/09/07 17:15:48 UTC

PTransforms and Fusion

A primitive transform is a PTransform that has been chosen to have no
default implementation in terms of other PTransforms. A primitive transform
therefore must be implemented directly by a pipeline runner in terms of
pipeline-runner-specific concepts. An initial list of primitive PTransforms
were defined in [2] and has since been updated in [3].

As part of the portability effort, libraries that are intended to be shared
across multiple runners are being developed to support their migration to a
portable execution model. One of these is responsible for fusing multiple
primitive PTransforms together into a pipeline runner specific concept.
This library made the choice that a primitive PTransform is a PTransform
that doesn't contain any other PTransforms.

Unfortunately, while Ryan was attempting to enable testing of validates
runner tests for Flink using the new portability libraries, he ran into an
issue where the Apache Beam Java SDK allows for a person to construct a
PTransform that has zero sub PTransforms and also isn't one of the defined
Apache Beam primitives. In this case the PTransform was trivial as it was
not applying any additional transforms to input PCollection and just
returning it. This caused an issue within the portability libraries since
they couldn't handle this structure.

To solve this issue, I had proposed that we modify the portability library
that does fusion to use a whitelist of primitives preventing the issue from
happening. This solved the problem but caused an issue for Thomas as he was
relying on this behaviour of PTransforms with zero sub transforms being
primitives. Thomas has a use-case where he wants to expose the internal
Flink Kafka and Kinesis connectors and to build Apache Beam pipelines that
use the Flink native sources/sinks. I'll call these "native" PTransforms,
since they aren't part of the Apache Beam model and are runner specific.

This brings up two topics:
A) What should we do with these "empty" PTransforms?
B) What should we do with "native" PTransforms?

The typical flow of a pipeline representation for a portable pipeline is:
language specific representation -> proto representation -> job service ->
shared libraries that simplify/replace the proto representation with a
simplified version (e.g. fusion) -> runner specific conversion to native
runner concepts (e.g. GBK -> runner implementation of GBK)

------------------

A) What should we do with these "empty" PTransforms?

To give a little more detail, these transforms typically can happen if
people have conditional logic such as loops that would perform an expansion
but do nothing if the condition is immediately unsatisfied. So allowing for
PTransforms that are empty is useful when building a pipeline.

What should we do:
A1) Stick with the whitelist of primitive PTransforms.
A2) When converting the pipeline from language specific representation into
the proto representation, drop any "empty" PTransforms. This means that the
pipeline representation that is sent to the runner doesn't contain the
offending type of PTransform and the shared libraries wouldn't have to
change.
A3) Handle the "empty" PTransform case within all of the shared libraries.

I like doing both A1 and A2. A1 since it helps simplify the shared
libraries since we know the whole list of primitives we need to understand
and A2 because it removes noise within the pipeline shape from its
representation.

------------------

B) What should we do with "native" PTransforms?

Some approaches that we could take as a community:

B1) Prevent the usage of "native" PTransforms within Apache Beam since they
hurt portability of pipelines across runners. This can be done by
specifically using whitelists of allowed primitive PTransforms in the
shared libraries and explicitly not allowing for shared libraries to have
extension points customizing this.

B2) We embrace that closed systems internal to companies will want to use
their own extensions and enable support for "native" PTransforms but
actively discourage "native" PTransforms in the open ecosystem.

B3) We embrace and allow for "native" PTransforms in the open ecosystem.

"native" PTransforms are useful in closed systems since they allow
companies to solve certain scenarios which would not be practical to expose
the Apache Beam community. It does take more work for the community to
support these types of extensions. To my knowledge, Google is likely to
want to do something similar to handle internal use cases similar to what
Thomas is trying to do.

I'm for either B1 or B2 since the risk of embracing and allowing for
"native" PTransforms in the open ecosystem is likely to fragment the
project and also is counter to what portability is really about.

1: https://github.com/apache/beam/pull/6328
2:
https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by

3:
https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180

Re: PTransforms and Fusion

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Sep 11, 2018 at 7:01 PM Henning Rohde <he...@google.com> wrote:

>
> Empty pipelines have neither subtransforms or a spec, which is what I
> don't think is useful
>

There's nothing preventing them form having a spec, display data, etc.
They're useful because they (more) faithfully represent the user's intent,
the same as all other composites the runner is not aware of.

-- especially given the only usecase (which is really "nop") would create
> non-timer loops in the representations. I'd rather have a well-known nop
> primitive instead. Even now, for the A example, I don't think it's
> unreasonable to add a (well-known) identity transform inside a normal
> composite to retain the extrema at either end. It could be ignored at
> runtime at no cost.
>

To me this feels like designing a containers library, but disallowing empty
containers. Why would anyone want to use a container that's empty? Well, it
turns out that's very useful and disallowing it means adding all kinds of
exceptions and guards (or runtime errors) in any code using it. Having a
"put this no-op element in if you want your container to be semantically
empty" isn't a good solution either. Maybe it's just the mathematician in
me, but disallowing this well-defined if degenerate case from the spec
seems bad (especially if it's because a particular library neither handles
nor drops them yet).


On Fri, Sep 14, 2018 at 10:28 PM Lukasz Cwik <lc...@google.com> wrote:

> Robert, in my mind making the shared libraries extensible for internal
> usage would be B2. Exposing those native PTransforms within the community
> would be B3.
>
> It seems as though we can achieve both A1 and B2 by making the shared
> library extensible. I have this commit[1] which starts to allow the shared
> libraries to be extensible and support native transformers.
>
>>
>>>>>>> <https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180>
>>>>>>
>>>>>>
Yes, this looks like a good pragmatic solution.

Re: PTransforms and Fusion

Posted by Lukasz Cwik <lc...@google.com>.
Robert, in my mind making the shared libraries extensible for internal
usage would be B2. Exposing those native PTransforms within the community
would be B3.

It seems as though we can achieve both A1 and B2 by making the shared
library extensible. I have this commit[1] which starts to allow the shared
libraries to be extensible and support native transformers. I believe this
will unblock Ryan's PR from going in and also enable Thomas to meet his
objective as well. Ryan, I opened this PR[2] to amend your PR based upon
the discussions within this thread.

1:
https://github.com/lukecwik/incubator-beam/commit/f0a0e93494c55256f775c24aaf4e3ee57e8725d8
2: https://github.com/ryan-williams/beam/pull/2


On Tue, Sep 11, 2018 at 10:01 AM Henning Rohde <he...@google.com> wrote:

>
> Empty pipelines have neither subtransforms or a spec, which is what I
> don't think is useful -- especially given the only usecase (which is really
> "nop") would create non-timer loops in the representations. I'd rather have
> a well-known nop primitive instead. Even now, for the A example, I don't
> think it's unreasonable to add a (well-known) identity transform inside a
> normal composite to retain the extrema at either end. It could be ignored
> at runtime at no cost.
>
> To clarify my support for A1, native transforms would have a spec and
> would be passed through in the shared code even through they're not
> primitives.
>
>
> On Tue, Sep 11, 2018 at 12:56 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> For (A), it really boils down to the question of what is a legal
>> pipeline. A1 takes the position that all empty transforms must be on a
>> whitelist (which implies B1, unless we make the whitelist extensible, which
>> starts to sound a lot like B3). Presumably if we want to support B2, we
>> cannot remove all empty unknown transforms, just those whose outputs are a
>> subset of the inputs.
>>
>> The reason I strongly support A3 is that empty PTransforms are not just
>> noise, they are expressions of user intent, and the pipeline graph should
>> reflect that as faithfully as possible. This is the whole point of
>> composite transforms--one should not be required to expose what is inside
>> (even whether it's empty). Consider, for example, an A, B -> C transform
>> that mixes A and B in proportions to produce C. In the degenerate case
>> where we want 100% for A or 100% from B, it's reasonable to implement this
>> by just returning A or B directly. But when, say, visualizing the pipeline
>> graph, I don't think it's desirable to have the discontinuity of the
>> composite transform suddenly disappearing when the mixing parameter is at
>> either extreme.
>>
>> If a runner cannot handle these empty pipelines (as is the case for those
>> relying on the current Java libraries) it is an easy matter for it to drop
>> them, but that doesn't mean we should withhold this information (by making
>> it illegal and dropping it in every SDK) from a runner (or any other tool)
>> that would want to see this information.
>>
>> - Robert
>>
>>
>> On Tue, Sep 11, 2018 at 4:20 AM Henning Rohde <he...@google.com> wrote:
>>
>>> For A, I am in favor of A1 and A2 as well. It is then up to each SDK to
>>> not generate "empty" transforms in the proto representation as we avoid
>>> noise as mentioned. The shared Java libraries are also optional and we
>>> should not assume every runner will use them. I'm not convinced empty
>>> transforms would have value for pipeline structure over what can be
>>> accomplished with normal composites. I suspect empty transforms, such as A,
>>> B -> B, B, will just be confusion generators.
>>>
>>> For B, I favor B2 for the reasons Thomas mentions. I also agree with the
>>> -1 for B1.
>>>
>>> On Mon, Sep 10, 2018 at 2:51 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> For B, note the prior discussion [1].
>>>>
>>>> B1 and B2 cannot be supported at the same time.
>>>>
>>>> Native transforms will almost always be customizations. Users do not
>>>> create customizations without reason. They would start with what is there
>>>> and dig deeper only when needed. Right now there are no streaming
>>>> connectors in the Python SDK - should the user not use the SDK? Or is it
>>>> better (now and in general) to have the option of a custom connector, even
>>>> when it is not portable?
>>>>
>>>> Regarding portability, IMO it should be up to the user to decide how
>>>> much of it is necessary/important. The IO requirements are normally
>>>> dictated by the infrastructure. If it has Kafka, Kinesis or any other
>>>> source (including those that Beam might never have a connector for), the
>>>> user needs to be able to integrate it.
>>>>
>>>> Overall extensibility is important and will help users adopt Beam. This
>>>> has come up in a few other areas (think Docker environments). I think we
>>>> need to provide the flexibility and enable, not prevent alternatives and
>>>> therefore -1 for B1 (unsurprisingly :).
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/9813ee10cb1cd9bf64e1c4f04c02b606c7b12d733f4505fb62f4a954@%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> On Mon, Sep 10, 2018 at 10:14 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> A) I think it's a bug to not handle empty PTransforms (which are
>>>>> useful at pipeline construction, and may still have meaning in terms of
>>>>> pipeline structure, e.g. for visualization). Note that such transforms, if
>>>>> truly composite, can't output any PCollections that do not appear in their
>>>>> inputs (which is how we distinguish them from primitives in Python). Thus
>>>>> I'm in favor of A3, and as a stopgap we can drop these transforms as part
>>>>> of/just before decoding in the Java libraries (rather than in the SDKs
>>>>> during encoding as in A2).
>>>>>
>>>>> B) I'm also for B1 or B2.
>>>>>
>>>>>
>>>>> On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> > A) What should we do with these "empty" PTransforms?
>>>>>>
>>>>>> We can't translate them, so dropping them seems the most reasonable
>>>>>> choice. Should we throw an error/warning to make the user aware of
>>>>>> this?
>>>>>> Otherwise might be unexpected for the user.
>>>>>>
>>>>>> >> A3) Handle the "empty" PTransform case within all of the shared
>>>>>> libraries.
>>>>>>
>>>>>> What can we do at this point other than dropping them?
>>>>>>
>>>>>> > B) What should we do with "native" PTransforms?
>>>>>>
>>>>>> I support B1 and B2 as well. Non-portable PTransforms should be
>>>>>> discouraged in the long run. However, the available PTransforms are
>>>>>> not
>>>>>> even consistent across the different SDKs yet (e.g. no streaming
>>>>>> connectors in Python), so we should continue to provide a way to
>>>>>> utilize
>>>>>> the "native" transforms of a Runner.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 07.09.18 19:15, Lukasz Cwik wrote:
>>>>>> > A primitive transform is a PTransform that has been chosen to have
>>>>>> no
>>>>>> > default implementation in terms of other PTransforms. A primitive
>>>>>> > transform therefore must be implemented directly by a pipeline
>>>>>> runner in
>>>>>> > terms of pipeline-runner-specific concepts. An initial list of
>>>>>> primitive
>>>>>> > PTransforms were defined in [2] and has since been updated in [3].
>>>>>> >
>>>>>> > As part of the portability effort, libraries that are intended to
>>>>>> be
>>>>>> > shared across multiple runners are being developed to support their
>>>>>> > migration to a portable execution model. One of these is
>>>>>> responsible for
>>>>>> > fusing multiple primitive PTransforms together into a pipeline
>>>>>> runner
>>>>>> > specific concept. This library made the choice that a primitive
>>>>>> > PTransform is a PTransform that doesn't contain any other
>>>>>> PTransforms.
>>>>>> >
>>>>>> > Unfortunately, while Ryan was attempting to enable testing of
>>>>>> validates
>>>>>> > runner tests for Flink using the new portability libraries, he ran
>>>>>> into
>>>>>> > an issue where the Apache Beam Java SDK allows for a person to
>>>>>> construct
>>>>>> > a PTransform that has zero sub PTransforms and also isn't one of
>>>>>> the
>>>>>> > defined Apache Beam primitives. In this case the PTransform was
>>>>>> trivial
>>>>>> > as it was not applying any additional transforms to input
>>>>>> PCollection
>>>>>> > and just returning it. This caused an issue within the portability
>>>>>> > libraries since they couldn't handle this structure.
>>>>>> >
>>>>>> > To solve this issue, I had proposed that we modify the portability
>>>>>> > library that does fusion to use a whitelist of primitives
>>>>>> preventing the
>>>>>> > issue from happening. This solved the problem but caused an issue
>>>>>> for
>>>>>> > Thomas as he was relying on this behaviour of PTransforms with zero
>>>>>> sub
>>>>>> > transforms being primitives. Thomas has a use-case where he wants
>>>>>> to
>>>>>> > expose the internal Flink Kafka and Kinesis connectors and to build
>>>>>> > Apache Beam pipelines that use the Flink native sources/sinks. I'll
>>>>>> call
>>>>>> > these "native" PTransforms, since they aren't part of the Apache
>>>>>> Beam
>>>>>> > model and are runner specific.
>>>>>> >
>>>>>> > This brings up two topics:
>>>>>> > A) What should we do with these "empty" PTransforms?
>>>>>> > B) What should we do with "native" PTransforms?
>>>>>> >
>>>>>> > The typical flow of a pipeline representation for a portable
>>>>>> pipeline is:
>>>>>> > language specific representation -> proto representation -> job
>>>>>> service
>>>>>> > -> shared libraries that simplify/replace the proto representation
>>>>>> with
>>>>>> > a simplified version (e.g. fusion) -> runner specific conversion to
>>>>>> > native runner concepts (e.g. GBK -> runner implementation of GBK)
>>>>>> >
>>>>>> > ------------------
>>>>>> >
>>>>>> > A) What should we do with these "empty" PTransforms?
>>>>>> >
>>>>>> > To give a little more detail, these transforms typically can happen
>>>>>> if
>>>>>> > people have conditional logic such as loops that would perform an
>>>>>> > expansion but do nothing if the condition is immediately
>>>>>> unsatisfied. So
>>>>>> > allowing for PTransforms that are empty is useful when building a
>>>>>> pipeline.
>>>>>> >
>>>>>> > What should we do:
>>>>>> > A1) Stick with the whitelist of primitive PTransforms.
>>>>>> > A2) When converting the pipeline from language specific
>>>>>> representation
>>>>>> > into the proto representation, drop any "empty" PTransforms. This
>>>>>> means
>>>>>> > that the pipeline representation that is sent to the runner doesn't
>>>>>> > contain the offending type of PTransform and the shared libraries
>>>>>> > wouldn't have to change.
>>>>>> > A3) Handle the "empty" PTransform case within all of the shared
>>>>>> libraries.
>>>>>> >
>>>>>> > I like doing both A1 and A2. A1 since it helps simplify the shared
>>>>>> > libraries since we know the whole list of primitives we need to
>>>>>> > understand and A2 because it removes noise within the pipeline
>>>>>> shape
>>>>>> > from its representation.
>>>>>> >
>>>>>> > ------------------
>>>>>> >
>>>>>> > B) What should we do with "native" PTransforms?
>>>>>> >
>>>>>> > Some approaches that we could take as a community:
>>>>>> >
>>>>>> > B1) Prevent the usage of "native" PTransforms within Apache Beam
>>>>>> since
>>>>>> > they hurt portability of pipelines across runners. This can be done
>>>>>> by
>>>>>> > specifically using whitelists of allowed primitive PTransforms in
>>>>>> the
>>>>>> > shared libraries and explicitly not allowing for shared libraries
>>>>>> to
>>>>>> > have extension points customizing this.
>>>>>> >
>>>>>> > B2) We embrace that closed systems internal to companies will want
>>>>>> to
>>>>>> > use their own extensions and enable support for "native"
>>>>>> PTransforms but
>>>>>> > actively discourage "native" PTransforms in the open ecosystem.
>>>>>> >
>>>>>> > B3) We embrace and allow for "native" PTransforms in the open
>>>>>> ecosystem.
>>>>>> >
>>>>>> > "native" PTransforms are useful in closed systems since they allow
>>>>>> > companies to solve certain scenarios which would not be practical
>>>>>> to
>>>>>> > expose the Apache Beam community. It does take more work for the
>>>>>> > community to support these types of extensions. To my knowledge,
>>>>>> Google
>>>>>> > is likely to want to do something similar to handle internal use
>>>>>> cases
>>>>>> > similar to what Thomas is trying to do.
>>>>>> >
>>>>>> > I'm for either B1 or B2 since the risk of embracing and allowing
>>>>>> for
>>>>>> > "native" PTransforms in the open ecosystem is likely to fragment
>>>>>> the
>>>>>> > project and also is counter to what portability is really about.
>>>>>> >
>>>>>> > 1: https://github.com/apache/beam/pull/6328
>>>>>> > 2:
>>>>>> >
>>>>>> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
>>>>>> >
>>>>>> > 3:
>>>>>> >
>>>>>> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>>>>>>
>>>>>

Re: PTransforms and Fusion

Posted by Henning Rohde <he...@google.com>.
Empty pipelines have neither subtransforms or a spec, which is what I don't
think is useful -- especially given the only usecase (which is really
"nop") would create non-timer loops in the representations. I'd rather have
a well-known nop primitive instead. Even now, for the A example, I don't
think it's unreasonable to add a (well-known) identity transform inside a
normal composite to retain the extrema at either end. It could be ignored
at runtime at no cost.

To clarify my support for A1, native transforms would have a spec and would
be passed through in the shared code even through they're not primitives.


On Tue, Sep 11, 2018 at 12:56 AM Robert Bradshaw <ro...@google.com>
wrote:

> For (A), it really boils down to the question of what is a legal pipeline.
> A1 takes the position that all empty transforms must be on a whitelist
> (which implies B1, unless we make the whitelist extensible, which starts to
> sound a lot like B3). Presumably if we want to support B2, we cannot remove
> all empty unknown transforms, just those whose outputs are a subset of the
> inputs.
>
> The reason I strongly support A3 is that empty PTransforms are not just
> noise, they are expressions of user intent, and the pipeline graph should
> reflect that as faithfully as possible. This is the whole point of
> composite transforms--one should not be required to expose what is inside
> (even whether it's empty). Consider, for example, an A, B -> C transform
> that mixes A and B in proportions to produce C. In the degenerate case
> where we want 100% for A or 100% from B, it's reasonable to implement this
> by just returning A or B directly. But when, say, visualizing the pipeline
> graph, I don't think it's desirable to have the discontinuity of the
> composite transform suddenly disappearing when the mixing parameter is at
> either extreme.
>
> If a runner cannot handle these empty pipelines (as is the case for those
> relying on the current Java libraries) it is an easy matter for it to drop
> them, but that doesn't mean we should withhold this information (by making
> it illegal and dropping it in every SDK) from a runner (or any other tool)
> that would want to see this information.
>
> - Robert
>
>
> On Tue, Sep 11, 2018 at 4:20 AM Henning Rohde <he...@google.com> wrote:
>
>> For A, I am in favor of A1 and A2 as well. It is then up to each SDK to
>> not generate "empty" transforms in the proto representation as we avoid
>> noise as mentioned. The shared Java libraries are also optional and we
>> should not assume every runner will use them. I'm not convinced empty
>> transforms would have value for pipeline structure over what can be
>> accomplished with normal composites. I suspect empty transforms, such as A,
>> B -> B, B, will just be confusion generators.
>>
>> For B, I favor B2 for the reasons Thomas mentions. I also agree with the
>> -1 for B1.
>>
>> On Mon, Sep 10, 2018 at 2:51 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> For B, note the prior discussion [1].
>>>
>>> B1 and B2 cannot be supported at the same time.
>>>
>>> Native transforms will almost always be customizations. Users do not
>>> create customizations without reason. They would start with what is there
>>> and dig deeper only when needed. Right now there are no streaming
>>> connectors in the Python SDK - should the user not use the SDK? Or is it
>>> better (now and in general) to have the option of a custom connector, even
>>> when it is not portable?
>>>
>>> Regarding portability, IMO it should be up to the user to decide how
>>> much of it is necessary/important. The IO requirements are normally
>>> dictated by the infrastructure. If it has Kafka, Kinesis or any other
>>> source (including those that Beam might never have a connector for), the
>>> user needs to be able to integrate it.
>>>
>>> Overall extensibility is important and will help users adopt Beam. This
>>> has come up in a few other areas (think Docker environments). I think we
>>> need to provide the flexibility and enable, not prevent alternatives and
>>> therefore -1 for B1 (unsurprisingly :).
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/9813ee10cb1cd9bf64e1c4f04c02b606c7b12d733f4505fb62f4a954@%3Cdev.beam.apache.org%3E
>>>
>>>
>>> On Mon, Sep 10, 2018 at 10:14 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> A) I think it's a bug to not handle empty PTransforms (which are useful
>>>> at pipeline construction, and may still have meaning in terms of pipeline
>>>> structure, e.g. for visualization). Note that such transforms, if truly
>>>> composite, can't output any PCollections that do not appear in their inputs
>>>> (which is how we distinguish them from primitives in Python). Thus I'm in
>>>> favor of A3, and as a stopgap we can drop these transforms as part of/just
>>>> before decoding in the Java libraries (rather than in the SDKs during
>>>> encoding as in A2).
>>>>
>>>> B) I'm also for B1 or B2.
>>>>
>>>>
>>>> On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> > A) What should we do with these "empty" PTransforms?
>>>>>
>>>>> We can't translate them, so dropping them seems the most reasonable
>>>>> choice. Should we throw an error/warning to make the user aware of
>>>>> this?
>>>>> Otherwise might be unexpected for the user.
>>>>>
>>>>> >> A3) Handle the "empty" PTransform case within all of the shared
>>>>> libraries.
>>>>>
>>>>> What can we do at this point other than dropping them?
>>>>>
>>>>> > B) What should we do with "native" PTransforms?
>>>>>
>>>>> I support B1 and B2 as well. Non-portable PTransforms should be
>>>>> discouraged in the long run. However, the available PTransforms are
>>>>> not
>>>>> even consistent across the different SDKs yet (e.g. no streaming
>>>>> connectors in Python), so we should continue to provide a way to
>>>>> utilize
>>>>> the "native" transforms of a Runner.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 07.09.18 19:15, Lukasz Cwik wrote:
>>>>> > A primitive transform is a PTransform that has been chosen to have
>>>>> no
>>>>> > default implementation in terms of other PTransforms. A primitive
>>>>> > transform therefore must be implemented directly by a pipeline
>>>>> runner in
>>>>> > terms of pipeline-runner-specific concepts. An initial list of
>>>>> primitive
>>>>> > PTransforms were defined in [2] and has since been updated in [3].
>>>>> >
>>>>> > As part of the portability effort, libraries that are intended to be
>>>>> > shared across multiple runners are being developed to support their
>>>>> > migration to a portable execution model. One of these is responsible
>>>>> for
>>>>> > fusing multiple primitive PTransforms together into a pipeline
>>>>> runner
>>>>> > specific concept. This library made the choice that a primitive
>>>>> > PTransform is a PTransform that doesn't contain any other
>>>>> PTransforms.
>>>>> >
>>>>> > Unfortunately, while Ryan was attempting to enable testing of
>>>>> validates
>>>>> > runner tests for Flink using the new portability libraries, he ran
>>>>> into
>>>>> > an issue where the Apache Beam Java SDK allows for a person to
>>>>> construct
>>>>> > a PTransform that has zero sub PTransforms and also isn't one of the
>>>>> > defined Apache Beam primitives. In this case the PTransform was
>>>>> trivial
>>>>> > as it was not applying any additional transforms to input
>>>>> PCollection
>>>>> > and just returning it. This caused an issue within the portability
>>>>> > libraries since they couldn't handle this structure.
>>>>> >
>>>>> > To solve this issue, I had proposed that we modify the portability
>>>>> > library that does fusion to use a whitelist of primitives preventing
>>>>> the
>>>>> > issue from happening. This solved the problem but caused an issue
>>>>> for
>>>>> > Thomas as he was relying on this behaviour of PTransforms with zero
>>>>> sub
>>>>> > transforms being primitives. Thomas has a use-case where he wants to
>>>>> > expose the internal Flink Kafka and Kinesis connectors and to build
>>>>> > Apache Beam pipelines that use the Flink native sources/sinks. I'll
>>>>> call
>>>>> > these "native" PTransforms, since they aren't part of the Apache
>>>>> Beam
>>>>> > model and are runner specific.
>>>>> >
>>>>> > This brings up two topics:
>>>>> > A) What should we do with these "empty" PTransforms?
>>>>> > B) What should we do with "native" PTransforms?
>>>>> >
>>>>> > The typical flow of a pipeline representation for a portable
>>>>> pipeline is:
>>>>> > language specific representation -> proto representation -> job
>>>>> service
>>>>> > -> shared libraries that simplify/replace the proto representation
>>>>> with
>>>>> > a simplified version (e.g. fusion) -> runner specific conversion to
>>>>> > native runner concepts (e.g. GBK -> runner implementation of GBK)
>>>>> >
>>>>> > ------------------
>>>>> >
>>>>> > A) What should we do with these "empty" PTransforms?
>>>>> >
>>>>> > To give a little more detail, these transforms typically can happen
>>>>> if
>>>>> > people have conditional logic such as loops that would perform an
>>>>> > expansion but do nothing if the condition is immediately
>>>>> unsatisfied. So
>>>>> > allowing for PTransforms that are empty is useful when building a
>>>>> pipeline.
>>>>> >
>>>>> > What should we do:
>>>>> > A1) Stick with the whitelist of primitive PTransforms.
>>>>> > A2) When converting the pipeline from language specific
>>>>> representation
>>>>> > into the proto representation, drop any "empty" PTransforms. This
>>>>> means
>>>>> > that the pipeline representation that is sent to the runner doesn't
>>>>> > contain the offending type of PTransform and the shared libraries
>>>>> > wouldn't have to change.
>>>>> > A3) Handle the "empty" PTransform case within all of the shared
>>>>> libraries.
>>>>> >
>>>>> > I like doing both A1 and A2. A1 since it helps simplify the shared
>>>>> > libraries since we know the whole list of primitives we need to
>>>>> > understand and A2 because it removes noise within the pipeline shape
>>>>> > from its representation.
>>>>> >
>>>>> > ------------------
>>>>> >
>>>>> > B) What should we do with "native" PTransforms?
>>>>> >
>>>>> > Some approaches that we could take as a community:
>>>>> >
>>>>> > B1) Prevent the usage of "native" PTransforms within Apache Beam
>>>>> since
>>>>> > they hurt portability of pipelines across runners. This can be done
>>>>> by
>>>>> > specifically using whitelists of allowed primitive PTransforms in
>>>>> the
>>>>> > shared libraries and explicitly not allowing for shared libraries to
>>>>> > have extension points customizing this.
>>>>> >
>>>>> > B2) We embrace that closed systems internal to companies will want
>>>>> to
>>>>> > use their own extensions and enable support for "native" PTransforms
>>>>> but
>>>>> > actively discourage "native" PTransforms in the open ecosystem.
>>>>> >
>>>>> > B3) We embrace and allow for "native" PTransforms in the open
>>>>> ecosystem.
>>>>> >
>>>>> > "native" PTransforms are useful in closed systems since they allow
>>>>> > companies to solve certain scenarios which would not be practical to
>>>>> > expose the Apache Beam community. It does take more work for the
>>>>> > community to support these types of extensions. To my knowledge,
>>>>> Google
>>>>> > is likely to want to do something similar to handle internal use
>>>>> cases
>>>>> > similar to what Thomas is trying to do.
>>>>> >
>>>>> > I'm for either B1 or B2 since the risk of embracing and allowing for
>>>>> > "native" PTransforms in the open ecosystem is likely to fragment the
>>>>> > project and also is counter to what portability is really about.
>>>>> >
>>>>> > 1: https://github.com/apache/beam/pull/6328
>>>>> > 2:
>>>>> >
>>>>> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
>>>>> >
>>>>> > 3:
>>>>> >
>>>>> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>>>>>
>>>>

Re: PTransforms and Fusion

Posted by Robert Bradshaw <ro...@google.com>.
For (A), it really boils down to the question of what is a legal pipeline.
A1 takes the position that all empty transforms must be on a whitelist
(which implies B1, unless we make the whitelist extensible, which starts to
sound a lot like B3). Presumably if we want to support B2, we cannot remove
all empty unknown transforms, just those whose outputs are a subset of the
inputs.

The reason I strongly support A3 is that empty PTransforms are not just
noise, they are expressions of user intent, and the pipeline graph should
reflect that as faithfully as possible. This is the whole point of
composite transforms--one should not be required to expose what is inside
(even whether it's empty). Consider, for example, an A, B -> C transform
that mixes A and B in proportions to produce C. In the degenerate case
where we want 100% for A or 100% from B, it's reasonable to implement this
by just returning A or B directly. But when, say, visualizing the pipeline
graph, I don't think it's desirable to have the discontinuity of the
composite transform suddenly disappearing when the mixing parameter is at
either extreme.

If a runner cannot handle these empty pipelines (as is the case for those
relying on the current Java libraries) it is an easy matter for it to drop
them, but that doesn't mean we should withhold this information (by making
it illegal and dropping it in every SDK) from a runner (or any other tool)
that would want to see this information.

- Robert


On Tue, Sep 11, 2018 at 4:20 AM Henning Rohde <he...@google.com> wrote:

> For A, I am in favor of A1 and A2 as well. It is then up to each SDK to
> not generate "empty" transforms in the proto representation as we avoid
> noise as mentioned. The shared Java libraries are also optional and we
> should not assume every runner will use them. I'm not convinced empty
> transforms would have value for pipeline structure over what can be
> accomplished with normal composites. I suspect empty transforms, such as A,
> B -> B, B, will just be confusion generators.
>
> For B, I favor B2 for the reasons Thomas mentions. I also agree with the
> -1 for B1.
>
> On Mon, Sep 10, 2018 at 2:51 PM Thomas Weise <th...@apache.org> wrote:
>
>> For B, note the prior discussion [1].
>>
>> B1 and B2 cannot be supported at the same time.
>>
>> Native transforms will almost always be customizations. Users do not
>> create customizations without reason. They would start with what is there
>> and dig deeper only when needed. Right now there are no streaming
>> connectors in the Python SDK - should the user not use the SDK? Or is it
>> better (now and in general) to have the option of a custom connector, even
>> when it is not portable?
>>
>> Regarding portability, IMO it should be up to the user to decide how much
>> of it is necessary/important. The IO requirements are normally dictated by
>> the infrastructure. If it has Kafka, Kinesis or any other source (including
>> those that Beam might never have a connector for), the user needs to be
>> able to integrate it.
>>
>> Overall extensibility is important and will help users adopt Beam. This
>> has come up in a few other areas (think Docker environments). I think we
>> need to provide the flexibility and enable, not prevent alternatives and
>> therefore -1 for B1 (unsurprisingly :).
>>
>> [1]
>> https://lists.apache.org/thread.html/9813ee10cb1cd9bf64e1c4f04c02b606c7b12d733f4505fb62f4a954@%3Cdev.beam.apache.org%3E
>>
>>
>> On Mon, Sep 10, 2018 at 10:14 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> A) I think it's a bug to not handle empty PTransforms (which are useful
>>> at pipeline construction, and may still have meaning in terms of pipeline
>>> structure, e.g. for visualization). Note that such transforms, if truly
>>> composite, can't output any PCollections that do not appear in their inputs
>>> (which is how we distinguish them from primitives in Python). Thus I'm in
>>> favor of A3, and as a stopgap we can drop these transforms as part of/just
>>> before decoding in the Java libraries (rather than in the SDKs during
>>> encoding as in A2).
>>>
>>> B) I'm also for B1 or B2.
>>>
>>>
>>> On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> > A) What should we do with these "empty" PTransforms?
>>>>
>>>> We can't translate them, so dropping them seems the most reasonable
>>>> choice. Should we throw an error/warning to make the user aware of
>>>> this?
>>>> Otherwise might be unexpected for the user.
>>>>
>>>> >> A3) Handle the "empty" PTransform case within all of the shared
>>>> libraries.
>>>>
>>>> What can we do at this point other than dropping them?
>>>>
>>>> > B) What should we do with "native" PTransforms?
>>>>
>>>> I support B1 and B2 as well. Non-portable PTransforms should be
>>>> discouraged in the long run. However, the available PTransforms are not
>>>> even consistent across the different SDKs yet (e.g. no streaming
>>>> connectors in Python), so we should continue to provide a way to
>>>> utilize
>>>> the "native" transforms of a Runner.
>>>>
>>>> -Max
>>>>
>>>> On 07.09.18 19:15, Lukasz Cwik wrote:
>>>> > A primitive transform is a PTransform that has been chosen to have no
>>>> > default implementation in terms of other PTransforms. A primitive
>>>> > transform therefore must be implemented directly by a pipeline runner
>>>> in
>>>> > terms of pipeline-runner-specific concepts. An initial list of
>>>> primitive
>>>> > PTransforms were defined in [2] and has since been updated in [3].
>>>> >
>>>> > As part of the portability effort, libraries that are intended to be
>>>> > shared across multiple runners are being developed to support their
>>>> > migration to a portable execution model. One of these is responsible
>>>> for
>>>> > fusing multiple primitive PTransforms together into a pipeline runner
>>>> > specific concept. This library made the choice that a primitive
>>>> > PTransform is a PTransform that doesn't contain any other PTransforms.
>>>> >
>>>> > Unfortunately, while Ryan was attempting to enable testing of
>>>> validates
>>>> > runner tests for Flink using the new portability libraries, he ran
>>>> into
>>>> > an issue where the Apache Beam Java SDK allows for a person to
>>>> construct
>>>> > a PTransform that has zero sub PTransforms and also isn't one of the
>>>> > defined Apache Beam primitives. In this case the PTransform was
>>>> trivial
>>>> > as it was not applying any additional transforms to input PCollection
>>>> > and just returning it. This caused an issue within the portability
>>>> > libraries since they couldn't handle this structure.
>>>> >
>>>> > To solve this issue, I had proposed that we modify the portability
>>>> > library that does fusion to use a whitelist of primitives preventing
>>>> the
>>>> > issue from happening. This solved the problem but caused an issue for
>>>> > Thomas as he was relying on this behaviour of PTransforms with zero
>>>> sub
>>>> > transforms being primitives. Thomas has a use-case where he wants to
>>>> > expose the internal Flink Kafka and Kinesis connectors and to build
>>>> > Apache Beam pipelines that use the Flink native sources/sinks. I'll
>>>> call
>>>> > these "native" PTransforms, since they aren't part of the Apache Beam
>>>> > model and are runner specific.
>>>> >
>>>> > This brings up two topics:
>>>> > A) What should we do with these "empty" PTransforms?
>>>> > B) What should we do with "native" PTransforms?
>>>> >
>>>> > The typical flow of a pipeline representation for a portable pipeline
>>>> is:
>>>> > language specific representation -> proto representation -> job
>>>> service
>>>> > -> shared libraries that simplify/replace the proto representation
>>>> with
>>>> > a simplified version (e.g. fusion) -> runner specific conversion to
>>>> > native runner concepts (e.g. GBK -> runner implementation of GBK)
>>>> >
>>>> > ------------------
>>>> >
>>>> > A) What should we do with these "empty" PTransforms?
>>>> >
>>>> > To give a little more detail, these transforms typically can happen
>>>> if
>>>> > people have conditional logic such as loops that would perform an
>>>> > expansion but do nothing if the condition is immediately unsatisfied.
>>>> So
>>>> > allowing for PTransforms that are empty is useful when building a
>>>> pipeline.
>>>> >
>>>> > What should we do:
>>>> > A1) Stick with the whitelist of primitive PTransforms.
>>>> > A2) When converting the pipeline from language specific
>>>> representation
>>>> > into the proto representation, drop any "empty" PTransforms. This
>>>> means
>>>> > that the pipeline representation that is sent to the runner doesn't
>>>> > contain the offending type of PTransform and the shared libraries
>>>> > wouldn't have to change.
>>>> > A3) Handle the "empty" PTransform case within all of the shared
>>>> libraries.
>>>> >
>>>> > I like doing both A1 and A2. A1 since it helps simplify the shared
>>>> > libraries since we know the whole list of primitives we need to
>>>> > understand and A2 because it removes noise within the pipeline shape
>>>> > from its representation.
>>>> >
>>>> > ------------------
>>>> >
>>>> > B) What should we do with "native" PTransforms?
>>>> >
>>>> > Some approaches that we could take as a community:
>>>> >
>>>> > B1) Prevent the usage of "native" PTransforms within Apache Beam
>>>> since
>>>> > they hurt portability of pipelines across runners. This can be done
>>>> by
>>>> > specifically using whitelists of allowed primitive PTransforms in the
>>>> > shared libraries and explicitly not allowing for shared libraries to
>>>> > have extension points customizing this.
>>>> >
>>>> > B2) We embrace that closed systems internal to companies will want to
>>>> > use their own extensions and enable support for "native" PTransforms
>>>> but
>>>> > actively discourage "native" PTransforms in the open ecosystem.
>>>> >
>>>> > B3) We embrace and allow for "native" PTransforms in the open
>>>> ecosystem.
>>>> >
>>>> > "native" PTransforms are useful in closed systems since they allow
>>>> > companies to solve certain scenarios which would not be practical to
>>>> > expose the Apache Beam community. It does take more work for the
>>>> > community to support these types of extensions. To my knowledge,
>>>> Google
>>>> > is likely to want to do something similar to handle internal use
>>>> cases
>>>> > similar to what Thomas is trying to do.
>>>> >
>>>> > I'm for either B1 or B2 since the risk of embracing and allowing for
>>>> > "native" PTransforms in the open ecosystem is likely to fragment the
>>>> > project and also is counter to what portability is really about.
>>>> >
>>>> > 1: https://github.com/apache/beam/pull/6328
>>>> > 2:
>>>> >
>>>> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
>>>> >
>>>> > 3:
>>>> >
>>>> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>>>>
>>>

Re: PTransforms and Fusion

Posted by Henning Rohde <he...@google.com>.
For A, I am in favor of A1 and A2 as well. It is then up to each SDK to not
generate "empty" transforms in the proto representation as we avoid noise
as mentioned. The shared Java libraries are also optional and we should not
assume every runner will use them. I'm not convinced empty transforms would
have value for pipeline structure over what can be accomplished with normal
composites. I suspect empty transforms, such as A, B -> B, B, will just be
confusion generators.

For B, I favor B2 for the reasons Thomas mentions. I also agree with the -1
for B1.

On Mon, Sep 10, 2018 at 2:51 PM Thomas Weise <th...@apache.org> wrote:

> For B, note the prior discussion [1].
>
> B1 and B2 cannot be supported at the same time.
>
> Native transforms will almost always be customizations. Users do not
> create customizations without reason. They would start with what is there
> and dig deeper only when needed. Right now there are no streaming
> connectors in the Python SDK - should the user not use the SDK? Or is it
> better (now and in general) to have the option of a custom connector, even
> when it is not portable?
>
> Regarding portability, IMO it should be up to the user to decide how much
> of it is necessary/important. The IO requirements are normally dictated by
> the infrastructure. If it has Kafka, Kinesis or any other source (including
> those that Beam might never have a connector for), the user needs to be
> able to integrate it.
>
> Overall extensibility is important and will help users adopt Beam. This
> has come up in a few other areas (think Docker environments). I think we
> need to provide the flexibility and enable, not prevent alternatives and
> therefore -1 for B1 (unsurprisingly :).
>
> [1]
> https://lists.apache.org/thread.html/9813ee10cb1cd9bf64e1c4f04c02b606c7b12d733f4505fb62f4a954@%3Cdev.beam.apache.org%3E
>
>
> On Mon, Sep 10, 2018 at 10:14 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> A) I think it's a bug to not handle empty PTransforms (which are useful
>> at pipeline construction, and may still have meaning in terms of pipeline
>> structure, e.g. for visualization). Note that such transforms, if truly
>> composite, can't output any PCollections that do not appear in their inputs
>> (which is how we distinguish them from primitives in Python). Thus I'm in
>> favor of A3, and as a stopgap we can drop these transforms as part of/just
>> before decoding in the Java libraries (rather than in the SDKs during
>> encoding as in A2).
>>
>> B) I'm also for B1 or B2.
>>
>>
>> On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> > A) What should we do with these "empty" PTransforms?
>>>
>>> We can't translate them, so dropping them seems the most reasonable
>>> choice. Should we throw an error/warning to make the user aware of this?
>>> Otherwise might be unexpected for the user.
>>>
>>> >> A3) Handle the "empty" PTransform case within all of the shared
>>> libraries.
>>>
>>> What can we do at this point other than dropping them?
>>>
>>> > B) What should we do with "native" PTransforms?
>>>
>>> I support B1 and B2 as well. Non-portable PTransforms should be
>>> discouraged in the long run. However, the available PTransforms are not
>>> even consistent across the different SDKs yet (e.g. no streaming
>>> connectors in Python), so we should continue to provide a way to utilize
>>> the "native" transforms of a Runner.
>>>
>>> -Max
>>>
>>> On 07.09.18 19:15, Lukasz Cwik wrote:
>>> > A primitive transform is a PTransform that has been chosen to have no
>>> > default implementation in terms of other PTransforms. A primitive
>>> > transform therefore must be implemented directly by a pipeline runner
>>> in
>>> > terms of pipeline-runner-specific concepts. An initial list of
>>> primitive
>>> > PTransforms were defined in [2] and has since been updated in [3].
>>> >
>>> > As part of the portability effort, libraries that are intended to be
>>> > shared across multiple runners are being developed to support their
>>> > migration to a portable execution model. One of these is responsible
>>> for
>>> > fusing multiple primitive PTransforms together into a pipeline runner
>>> > specific concept. This library made the choice that a primitive
>>> > PTransform is a PTransform that doesn't contain any other PTransforms.
>>> >
>>> > Unfortunately, while Ryan was attempting to enable testing of
>>> validates
>>> > runner tests for Flink using the new portability libraries, he ran
>>> into
>>> > an issue where the Apache Beam Java SDK allows for a person to
>>> construct
>>> > a PTransform that has zero sub PTransforms and also isn't one of the
>>> > defined Apache Beam primitives. In this case the PTransform was
>>> trivial
>>> > as it was not applying any additional transforms to input PCollection
>>> > and just returning it. This caused an issue within the portability
>>> > libraries since they couldn't handle this structure.
>>> >
>>> > To solve this issue, I had proposed that we modify the portability
>>> > library that does fusion to use a whitelist of primitives preventing
>>> the
>>> > issue from happening. This solved the problem but caused an issue for
>>> > Thomas as he was relying on this behaviour of PTransforms with zero
>>> sub
>>> > transforms being primitives. Thomas has a use-case where he wants to
>>> > expose the internal Flink Kafka and Kinesis connectors and to build
>>> > Apache Beam pipelines that use the Flink native sources/sinks. I'll
>>> call
>>> > these "native" PTransforms, since they aren't part of the Apache Beam
>>> > model and are runner specific.
>>> >
>>> > This brings up two topics:
>>> > A) What should we do with these "empty" PTransforms?
>>> > B) What should we do with "native" PTransforms?
>>> >
>>> > The typical flow of a pipeline representation for a portable pipeline
>>> is:
>>> > language specific representation -> proto representation -> job
>>> service
>>> > -> shared libraries that simplify/replace the proto representation
>>> with
>>> > a simplified version (e.g. fusion) -> runner specific conversion to
>>> > native runner concepts (e.g. GBK -> runner implementation of GBK)
>>> >
>>> > ------------------
>>> >
>>> > A) What should we do with these "empty" PTransforms?
>>> >
>>> > To give a little more detail, these transforms typically can happen if
>>> > people have conditional logic such as loops that would perform an
>>> > expansion but do nothing if the condition is immediately unsatisfied.
>>> So
>>> > allowing for PTransforms that are empty is useful when building a
>>> pipeline.
>>> >
>>> > What should we do:
>>> > A1) Stick with the whitelist of primitive PTransforms.
>>> > A2) When converting the pipeline from language specific representation
>>> > into the proto representation, drop any "empty" PTransforms. This
>>> means
>>> > that the pipeline representation that is sent to the runner doesn't
>>> > contain the offending type of PTransform and the shared libraries
>>> > wouldn't have to change.
>>> > A3) Handle the "empty" PTransform case within all of the shared
>>> libraries.
>>> >
>>> > I like doing both A1 and A2. A1 since it helps simplify the shared
>>> > libraries since we know the whole list of primitives we need to
>>> > understand and A2 because it removes noise within the pipeline shape
>>> > from its representation.
>>> >
>>> > ------------------
>>> >
>>> > B) What should we do with "native" PTransforms?
>>> >
>>> > Some approaches that we could take as a community:
>>> >
>>> > B1) Prevent the usage of "native" PTransforms within Apache Beam since
>>> > they hurt portability of pipelines across runners. This can be done by
>>> > specifically using whitelists of allowed primitive PTransforms in the
>>> > shared libraries and explicitly not allowing for shared libraries to
>>> > have extension points customizing this.
>>> >
>>> > B2) We embrace that closed systems internal to companies will want to
>>> > use their own extensions and enable support for "native" PTransforms
>>> but
>>> > actively discourage "native" PTransforms in the open ecosystem.
>>> >
>>> > B3) We embrace and allow for "native" PTransforms in the open
>>> ecosystem.
>>> >
>>> > "native" PTransforms are useful in closed systems since they allow
>>> > companies to solve certain scenarios which would not be practical to
>>> > expose the Apache Beam community. It does take more work for the
>>> > community to support these types of extensions. To my knowledge,
>>> Google
>>> > is likely to want to do something similar to handle internal use cases
>>> > similar to what Thomas is trying to do.
>>> >
>>> > I'm for either B1 or B2 since the risk of embracing and allowing for
>>> > "native" PTransforms in the open ecosystem is likely to fragment the
>>> > project and also is counter to what portability is really about.
>>> >
>>> > 1: https://github.com/apache/beam/pull/6328
>>> > 2:
>>> >
>>> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
>>> >
>>> > 3:
>>> >
>>> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>>>
>>

Re: PTransforms and Fusion

Posted by Thomas Weise <th...@apache.org>.
For B, note the prior discussion [1].

B1 and B2 cannot be supported at the same time.

Native transforms will almost always be customizations. Users do not create
customizations without reason. They would start with what is there and dig
deeper only when needed. Right now there are no streaming connectors in the
Python SDK - should the user not use the SDK? Or is it better (now and in
general) to have the option of a custom connector, even when it is not
portable?

Regarding portability, IMO it should be up to the user to decide how much
of it is necessary/important. The IO requirements are normally dictated by
the infrastructure. If it has Kafka, Kinesis or any other source (including
those that Beam might never have a connector for), the user needs to be
able to integrate it.

Overall extensibility is important and will help users adopt Beam. This has
come up in a few other areas (think Docker environments). I think we need
to provide the flexibility and enable, not prevent alternatives and
therefore -1 for B1 (unsurprisingly :).

[1]
https://lists.apache.org/thread.html/9813ee10cb1cd9bf64e1c4f04c02b606c7b12d733f4505fb62f4a954@%3Cdev.beam.apache.org%3E


On Mon, Sep 10, 2018 at 10:14 AM Robert Bradshaw <ro...@google.com>
wrote:

> A) I think it's a bug to not handle empty PTransforms (which are useful at
> pipeline construction, and may still have meaning in terms of pipeline
> structure, e.g. for visualization). Note that such transforms, if truly
> composite, can't output any PCollections that do not appear in their inputs
> (which is how we distinguish them from primitives in Python). Thus I'm in
> favor of A3, and as a stopgap we can drop these transforms as part of/just
> before decoding in the Java libraries (rather than in the SDKs during
> encoding as in A2).
>
> B) I'm also for B1 or B2.
>
>
> On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org> wrote:
>
>> > A) What should we do with these "empty" PTransforms?
>>
>> We can't translate them, so dropping them seems the most reasonable
>> choice. Should we throw an error/warning to make the user aware of this?
>> Otherwise might be unexpected for the user.
>>
>> >> A3) Handle the "empty" PTransform case within all of the shared
>> libraries.
>>
>> What can we do at this point other than dropping them?
>>
>> > B) What should we do with "native" PTransforms?
>>
>> I support B1 and B2 as well. Non-portable PTransforms should be
>> discouraged in the long run. However, the available PTransforms are not
>> even consistent across the different SDKs yet (e.g. no streaming
>> connectors in Python), so we should continue to provide a way to utilize
>> the "native" transforms of a Runner.
>>
>> -Max
>>
>> On 07.09.18 19:15, Lukasz Cwik wrote:
>> > A primitive transform is a PTransform that has been chosen to have no
>> > default implementation in terms of other PTransforms. A primitive
>> > transform therefore must be implemented directly by a pipeline runner
>> in
>> > terms of pipeline-runner-specific concepts. An initial list of
>> primitive
>> > PTransforms were defined in [2] and has since been updated in [3].
>> >
>> > As part of the portability effort, libraries that are intended to be
>> > shared across multiple runners are being developed to support their
>> > migration to a portable execution model. One of these is responsible
>> for
>> > fusing multiple primitive PTransforms together into a pipeline runner
>> > specific concept. This library made the choice that a primitive
>> > PTransform is a PTransform that doesn't contain any other PTransforms.
>> >
>> > Unfortunately, while Ryan was attempting to enable testing of validates
>> > runner tests for Flink using the new portability libraries, he ran into
>> > an issue where the Apache Beam Java SDK allows for a person to
>> construct
>> > a PTransform that has zero sub PTransforms and also isn't one of the
>> > defined Apache Beam primitives. In this case the PTransform was trivial
>> > as it was not applying any additional transforms to input PCollection
>> > and just returning it. This caused an issue within the portability
>> > libraries since they couldn't handle this structure.
>> >
>> > To solve this issue, I had proposed that we modify the portability
>> > library that does fusion to use a whitelist of primitives preventing
>> the
>> > issue from happening. This solved the problem but caused an issue for
>> > Thomas as he was relying on this behaviour of PTransforms with zero sub
>> > transforms being primitives. Thomas has a use-case where he wants to
>> > expose the internal Flink Kafka and Kinesis connectors and to build
>> > Apache Beam pipelines that use the Flink native sources/sinks. I'll
>> call
>> > these "native" PTransforms, since they aren't part of the Apache Beam
>> > model and are runner specific.
>> >
>> > This brings up two topics:
>> > A) What should we do with these "empty" PTransforms?
>> > B) What should we do with "native" PTransforms?
>> >
>> > The typical flow of a pipeline representation for a portable pipeline
>> is:
>> > language specific representation -> proto representation -> job service
>> > -> shared libraries that simplify/replace the proto representation with
>> > a simplified version (e.g. fusion) -> runner specific conversion to
>> > native runner concepts (e.g. GBK -> runner implementation of GBK)
>> >
>> > ------------------
>> >
>> > A) What should we do with these "empty" PTransforms?
>> >
>> > To give a little more detail, these transforms typically can happen if
>> > people have conditional logic such as loops that would perform an
>> > expansion but do nothing if the condition is immediately unsatisfied.
>> So
>> > allowing for PTransforms that are empty is useful when building a
>> pipeline.
>> >
>> > What should we do:
>> > A1) Stick with the whitelist of primitive PTransforms.
>> > A2) When converting the pipeline from language specific representation
>> > into the proto representation, drop any "empty" PTransforms. This means
>> > that the pipeline representation that is sent to the runner doesn't
>> > contain the offending type of PTransform and the shared libraries
>> > wouldn't have to change.
>> > A3) Handle the "empty" PTransform case within all of the shared
>> libraries.
>> >
>> > I like doing both A1 and A2. A1 since it helps simplify the shared
>> > libraries since we know the whole list of primitives we need to
>> > understand and A2 because it removes noise within the pipeline shape
>> > from its representation.
>> >
>> > ------------------
>> >
>> > B) What should we do with "native" PTransforms?
>> >
>> > Some approaches that we could take as a community:
>> >
>> > B1) Prevent the usage of "native" PTransforms within Apache Beam since
>> > they hurt portability of pipelines across runners. This can be done by
>> > specifically using whitelists of allowed primitive PTransforms in the
>> > shared libraries and explicitly not allowing for shared libraries to
>> > have extension points customizing this.
>> >
>> > B2) We embrace that closed systems internal to companies will want to
>> > use their own extensions and enable support for "native" PTransforms
>> but
>> > actively discourage "native" PTransforms in the open ecosystem.
>> >
>> > B3) We embrace and allow for "native" PTransforms in the open ecosystem.
>> >
>> > "native" PTransforms are useful in closed systems since they allow
>> > companies to solve certain scenarios which would not be practical to
>> > expose the Apache Beam community. It does take more work for the
>> > community to support these types of extensions. To my knowledge, Google
>> > is likely to want to do something similar to handle internal use cases
>> > similar to what Thomas is trying to do.
>> >
>> > I'm for either B1 or B2 since the risk of embracing and allowing for
>> > "native" PTransforms in the open ecosystem is likely to fragment the
>> > project and also is counter to what portability is really about.
>> >
>> > 1: https://github.com/apache/beam/pull/6328
>> > 2:
>> >
>> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
>> >
>> > 3:
>> >
>> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>>
>

Re: PTransforms and Fusion

Posted by Robert Bradshaw <ro...@google.com>.
A) I think it's a bug to not handle empty PTransforms (which are useful at
pipeline construction, and may still have meaning in terms of pipeline
structure, e.g. for visualization). Note that such transforms, if truly
composite, can't output any PCollections that do not appear in their inputs
(which is how we distinguish them from primitives in Python). Thus I'm in
favor of A3, and as a stopgap we can drop these transforms as part of/just
before decoding in the Java libraries (rather than in the SDKs during
encoding as in A2).

B) I'm also for B1 or B2.


On Mon, Sep 10, 2018 at 3:58 PM Maximilian Michels <mx...@apache.org> wrote:

> > A) What should we do with these "empty" PTransforms?
>
> We can't translate them, so dropping them seems the most reasonable
> choice. Should we throw an error/warning to make the user aware of this?
> Otherwise might be unexpected for the user.
>
> >> A3) Handle the "empty" PTransform case within all of the shared
> libraries.
>
> What can we do at this point other than dropping them?
>
> > B) What should we do with "native" PTransforms?
>
> I support B1 and B2 as well. Non-portable PTransforms should be
> discouraged in the long run. However, the available PTransforms are not
> even consistent across the different SDKs yet (e.g. no streaming
> connectors in Python), so we should continue to provide a way to utilize
> the "native" transforms of a Runner.
>
> -Max
>
> On 07.09.18 19:15, Lukasz Cwik wrote:
> > A primitive transform is a PTransform that has been chosen to have no
> > default implementation in terms of other PTransforms. A primitive
> > transform therefore must be implemented directly by a pipeline runner in
> > terms of pipeline-runner-specific concepts. An initial list of primitive
> > PTransforms were defined in [2] and has since been updated in [3].
> >
> > As part of the portability effort, libraries that are intended to be
> > shared across multiple runners are being developed to support their
> > migration to a portable execution model. One of these is responsible for
> > fusing multiple primitive PTransforms together into a pipeline runner
> > specific concept. This library made the choice that a primitive
> > PTransform is a PTransform that doesn't contain any other PTransforms.
> >
> > Unfortunately, while Ryan was attempting to enable testing of validates
> > runner tests for Flink using the new portability libraries, he ran into
> > an issue where the Apache Beam Java SDK allows for a person to construct
> > a PTransform that has zero sub PTransforms and also isn't one of the
> > defined Apache Beam primitives. In this case the PTransform was trivial
> > as it was not applying any additional transforms to input PCollection
> > and just returning it. This caused an issue within the portability
> > libraries since they couldn't handle this structure.
> >
> > To solve this issue, I had proposed that we modify the portability
> > library that does fusion to use a whitelist of primitives preventing the
> > issue from happening. This solved the problem but caused an issue for
> > Thomas as he was relying on this behaviour of PTransforms with zero sub
> > transforms being primitives. Thomas has a use-case where he wants to
> > expose the internal Flink Kafka and Kinesis connectors and to build
> > Apache Beam pipelines that use the Flink native sources/sinks. I'll call
> > these "native" PTransforms, since they aren't part of the Apache Beam
> > model and are runner specific.
> >
> > This brings up two topics:
> > A) What should we do with these "empty" PTransforms?
> > B) What should we do with "native" PTransforms?
> >
> > The typical flow of a pipeline representation for a portable pipeline is:
> > language specific representation -> proto representation -> job service
> > -> shared libraries that simplify/replace the proto representation with
> > a simplified version (e.g. fusion) -> runner specific conversion to
> > native runner concepts (e.g. GBK -> runner implementation of GBK)
> >
> > ------------------
> >
> > A) What should we do with these "empty" PTransforms?
> >
> > To give a little more detail, these transforms typically can happen if
> > people have conditional logic such as loops that would perform an
> > expansion but do nothing if the condition is immediately unsatisfied. So
> > allowing for PTransforms that are empty is useful when building a
> pipeline.
> >
> > What should we do:
> > A1) Stick with the whitelist of primitive PTransforms.
> > A2) When converting the pipeline from language specific representation
> > into the proto representation, drop any "empty" PTransforms. This means
> > that the pipeline representation that is sent to the runner doesn't
> > contain the offending type of PTransform and the shared libraries
> > wouldn't have to change.
> > A3) Handle the "empty" PTransform case within all of the shared
> libraries.
> >
> > I like doing both A1 and A2. A1 since it helps simplify the shared
> > libraries since we know the whole list of primitives we need to
> > understand and A2 because it removes noise within the pipeline shape
> > from its representation.
> >
> > ------------------
> >
> > B) What should we do with "native" PTransforms?
> >
> > Some approaches that we could take as a community:
> >
> > B1) Prevent the usage of "native" PTransforms within Apache Beam since
> > they hurt portability of pipelines across runners. This can be done by
> > specifically using whitelists of allowed primitive PTransforms in the
> > shared libraries and explicitly not allowing for shared libraries to
> > have extension points customizing this.
> >
> > B2) We embrace that closed systems internal to companies will want to
> > use their own extensions and enable support for "native" PTransforms but
> > actively discourage "native" PTransforms in the open ecosystem.
> >
> > B3) We embrace and allow for "native" PTransforms in the open ecosystem.
> >
> > "native" PTransforms are useful in closed systems since they allow
> > companies to solve certain scenarios which would not be practical to
> > expose the Apache Beam community. It does take more work for the
> > community to support these types of extensions. To my knowledge, Google
> > is likely to want to do something similar to handle internal use cases
> > similar to what Thomas is trying to do.
> >
> > I'm for either B1 or B2 since the risk of embracing and allowing for
> > "native" PTransforms in the open ecosystem is likely to fragment the
> > project and also is counter to what portability is really about.
> >
> > 1: https://github.com/apache/beam/pull/6328
> > 2:
> >
> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by
> >
> > 3:
> >
> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180
>

Re: PTransforms and Fusion

Posted by Maximilian Michels <mx...@apache.org>.
> A) What should we do with these "empty" PTransforms?

We can't translate them, so dropping them seems the most reasonable 
choice. Should we throw an error/warning to make the user aware of this? 
Otherwise might be unexpected for the user.

>> A3) Handle the "empty" PTransform case within all of the shared libraries.

What can we do at this point other than dropping them?

> B) What should we do with "native" PTransforms?

I support B1 and B2 as well. Non-portable PTransforms should be 
discouraged in the long run. However, the available PTransforms are not 
even consistent across the different SDKs yet (e.g. no streaming 
connectors in Python), so we should continue to provide a way to utilize 
the "native" transforms of a Runner.

-Max

On 07.09.18 19:15, Lukasz Cwik wrote:
> A primitive transform is a PTransform that has been chosen to have no 
> default implementation in terms of other PTransforms. A primitive 
> transform therefore must be implemented directly by a pipeline runner in 
> terms of pipeline-runner-specific concepts. An initial list of primitive 
> PTransforms were defined in [2] and has since been updated in [3].
> 
> As part of the portability effort, libraries that are intended to be 
> shared across multiple runners are being developed to support their 
> migration to a portable execution model. One of these is responsible for 
> fusing multiple primitive PTransforms together into a pipeline runner 
> specific concept. This library made the choice that a primitive 
> PTransform is a PTransform that doesn't contain any other PTransforms.
> 
> Unfortunately, while Ryan was attempting to enable testing of validates 
> runner tests for Flink using the new portability libraries, he ran into 
> an issue where the Apache Beam Java SDK allows for a person to construct 
> a PTransform that has zero sub PTransforms and also isn't one of the 
> defined Apache Beam primitives. In this case the PTransform was trivial 
> as it was not applying any additional transforms to input PCollection 
> and just returning it. This caused an issue within the portability 
> libraries since they couldn't handle this structure.
> 
> To solve this issue, I had proposed that we modify the portability 
> library that does fusion to use a whitelist of primitives preventing the 
> issue from happening. This solved the problem but caused an issue for 
> Thomas as he was relying on this behaviour of PTransforms with zero sub 
> transforms being primitives. Thomas has a use-case where he wants to 
> expose the internal Flink Kafka and Kinesis connectors and to build 
> Apache Beam pipelines that use the Flink native sources/sinks. I'll call 
> these "native" PTransforms, since they aren't part of the Apache Beam 
> model and are runner specific.
> 
> This brings up two topics:
> A) What should we do with these "empty" PTransforms?
> B) What should we do with "native" PTransforms?
> 
> The typical flow of a pipeline representation for a portable pipeline is:
> language specific representation -> proto representation -> job service 
> -> shared libraries that simplify/replace the proto representation with 
> a simplified version (e.g. fusion) -> runner specific conversion to 
> native runner concepts (e.g. GBK -> runner implementation of GBK)
> 
> ------------------
> 
> A) What should we do with these "empty" PTransforms?
> 
> To give a little more detail, these transforms typically can happen if 
> people have conditional logic such as loops that would perform an 
> expansion but do nothing if the condition is immediately unsatisfied. So 
> allowing for PTransforms that are empty is useful when building a pipeline.
> 
> What should we do:
> A1) Stick with the whitelist of primitive PTransforms.
> A2) When converting the pipeline from language specific representation 
> into the proto representation, drop any "empty" PTransforms. This means 
> that the pipeline representation that is sent to the runner doesn't 
> contain the offending type of PTransform and the shared libraries 
> wouldn't have to change.
> A3) Handle the "empty" PTransform case within all of the shared libraries.
> 
> I like doing both A1 and A2. A1 since it helps simplify the shared 
> libraries since we know the whole list of primitives we need to 
> understand and A2 because it removes noise within the pipeline shape 
> from its representation.
> 
> ------------------
> 
> B) What should we do with "native" PTransforms?
> 
> Some approaches that we could take as a community:
> 
> B1) Prevent the usage of "native" PTransforms within Apache Beam since 
> they hurt portability of pipelines across runners. This can be done by 
> specifically using whitelists of allowed primitive PTransforms in the 
> shared libraries and explicitly not allowing for shared libraries to 
> have extension points customizing this.
> 
> B2) We embrace that closed systems internal to companies will want to 
> use their own extensions and enable support for "native" PTransforms but 
> actively discourage "native" PTransforms in the open ecosystem.
> 
> B3) We embrace and allow for "native" PTransforms in the open ecosystem.
> 
> "native" PTransforms are useful in closed systems since they allow 
> companies to solve certain scenarios which would not be practical to 
> expose the Apache Beam community. It does take more work for the 
> community to support these types of extensions. To my knowledge, Google 
> is likely to want to do something similar to handle internal use cases 
> similar to what Thomas is trying to do.
> 
> I'm for either B1 or B2 since the risk of embracing and allowing for 
> "native" PTransforms in the open ecosystem is likely to fragment the 
> project and also is counter to what portability is really about.
> 
> 1: https://github.com/apache/beam/pull/6328
> 2: 
> https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by 
> 
> 3: 
> https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180