You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jeff Klukas <jk...@mozilla.com> on 2018/09/14 17:42:13 UTC

[Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Hello all, I'm a data engineer at Mozilla working on a first project using
Beam. I've been impressed with the usability of the API as there are good
built-in solutions for handling many simple transformation cases with
minimal code, and wanted to discuss one bit of ergonomics that seems to be
missing.

It appears that none of the existing PTransform factories are generic
enough to take in or output a PCollectionTuple, but we've found many use
cases where it's convenient to apply a few transforms on a PCollectionTuple
in a lambda expression.

For example, we've defined several PTransforms that return main and error
output stream bundled in a PCollectionTuple. We defined a
CompositeTransform interface so that we could handle the error output in a
lambda expression like:

pipeline
    .apply("attempt to deserialize messages", new
MyDeserializationTransform())
    .apply("write deserialization errors",
        CompositeTransform.of((PCollectionTuple input) -> {
            input.get(errorTag).apply(new MyErrorOutputTransform())
            return input.get(mainTag);
        })
    .apply("more processing on the deserialized messages", new
MyOtherTransform())

I'd be interested in contributing a patch to add this functionality,
perhaps as a static method PTransform.compose(). Would that patch be
welcome? Are there other thoughts on naming?

The full code of the CompositeTransform interface we're currently using is
included below.


public interface CompositeTransform<InputT extends PInput, OutputT extends
POutput> {
  OutputT expand(InputT input);

  /**
   * The public factory method that serves as the entrypoint for users to
create a composite PTransform.
   */
  static <InputT extends PInput, OutputT extends POutput>
        PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT>
transform) {
    return new PTransform<InputT, OutputT>() {
      @Override
      public OutputT expand(InputT input) {
        return transform.expand(input);
      }
    };
  }
}

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Lukasz Cwik <lc...@google.com>.
If we don't hear much from users, I would be for merging the change as long
as it is marked @Experimental until we get future feedback on its usage.

On Wed, Sep 19, 2018 at 2:19 PM Jeff Klukas <jk...@mozilla.com> wrote:

> Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
> was hoping to get context on, since I don't yet have extensive experience
> with beam.
>
> I have not yet run into issues where the output coder was not able to be
> inferred. I expect this may be a non-issue, as the individual transforms
> used within a user-provided lambda expression would presumably expose the
> ability to specify a coder.
>
> I don't have enough context yet to comment on whether display data might
> be an issue, so I do hope the user list can provide input there.
>
> On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Thanks for the proposal and it does seem to make the API cleaner to build
>> anonymous composite transforms.
>>
>> In your experience have you had issues where the API doesn't work out
>> well because the PTransform:
>> * is not able to override how the output coder is inferred?
>> * can't supply display data?
>>
>> +user@beam.apache.org <us...@beam.apache.org>, do users think that the
>> provided API would be useful enough for it to be added to the core SDK or
>> would the addition of the method provide noise/detract from the existing
>> API?
>>
>> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on
>>> this suggestion and make it more concrete:
>>>
>>> https://issues.apache.org/jira/browse/BEAM-5413
>>> https://github.com/apache/beam/pull/6414
>>>
>>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>>> using Beam. I've been impressed with the usability of the API as there are
>>>> good built-in solutions for handling many simple transformation cases with
>>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>>> missing.
>>>>
>>>> It appears that none of the existing PTransform factories are generic
>>>> enough to take in or output a PCollectionTuple, but we've found many use
>>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>>> in a lambda expression.
>>>>
>>>> For example, we've defined several PTransforms that return main and
>>>> error output stream bundled in a PCollectionTuple. We defined a
>>>> CompositeTransform interface so that we could handle the error output in a
>>>> lambda expression like:
>>>>
>>>> pipeline
>>>>     .apply("attempt to deserialize messages", new
>>>> MyDeserializationTransform())
>>>>     .apply("write deserialization errors",
>>>>         CompositeTransform.of((PCollectionTuple input) -> {
>>>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>>>             return input.get(mainTag);
>>>>         })
>>>>     .apply("more processing on the deserialized messages", new
>>>> MyOtherTransform())
>>>>
>>>> I'd be interested in contributing a patch to add this functionality,
>>>> perhaps as a static method PTransform.compose(). Would that patch be
>>>> welcome? Are there other thoughts on naming?
>>>>
>>>> The full code of the CompositeTransform interface we're currently using
>>>> is included below.
>>>>
>>>>
>>>> public interface CompositeTransform<InputT extends PInput, OutputT
>>>> extends POutput> {
>>>>   OutputT expand(InputT input);
>>>>
>>>>   /**
>>>>    * The public factory method that serves as the entrypoint for users
>>>> to create a composite PTransform.
>>>>    */
>>>>   static <InputT extends PInput, OutputT extends POutput>
>>>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>>>> OutputT> transform) {
>>>>     return new PTransform<InputT, OutputT>() {
>>>>       @Override
>>>>       public OutputT expand(InputT input) {
>>>>         return transform.expand(input);
>>>>       }
>>>>     };
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Lukasz Cwik <lc...@google.com>.
If we don't hear much from users, I would be for merging the change as long
as it is marked @Experimental until we get future feedback on its usage.

On Wed, Sep 19, 2018 at 2:19 PM Jeff Klukas <jk...@mozilla.com> wrote:

> Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
> was hoping to get context on, since I don't yet have extensive experience
> with beam.
>
> I have not yet run into issues where the output coder was not able to be
> inferred. I expect this may be a non-issue, as the individual transforms
> used within a user-provided lambda expression would presumably expose the
> ability to specify a coder.
>
> I don't have enough context yet to comment on whether display data might
> be an issue, so I do hope the user list can provide input there.
>
> On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Thanks for the proposal and it does seem to make the API cleaner to build
>> anonymous composite transforms.
>>
>> In your experience have you had issues where the API doesn't work out
>> well because the PTransform:
>> * is not able to override how the output coder is inferred?
>> * can't supply display data?
>>
>> +user@beam.apache.org <us...@beam.apache.org>, do users think that the
>> provided API would be useful enough for it to be added to the core SDK or
>> would the addition of the method provide noise/detract from the existing
>> API?
>>
>> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on
>>> this suggestion and make it more concrete:
>>>
>>> https://issues.apache.org/jira/browse/BEAM-5413
>>> https://github.com/apache/beam/pull/6414
>>>
>>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>>> using Beam. I've been impressed with the usability of the API as there are
>>>> good built-in solutions for handling many simple transformation cases with
>>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>>> missing.
>>>>
>>>> It appears that none of the existing PTransform factories are generic
>>>> enough to take in or output a PCollectionTuple, but we've found many use
>>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>>> in a lambda expression.
>>>>
>>>> For example, we've defined several PTransforms that return main and
>>>> error output stream bundled in a PCollectionTuple. We defined a
>>>> CompositeTransform interface so that we could handle the error output in a
>>>> lambda expression like:
>>>>
>>>> pipeline
>>>>     .apply("attempt to deserialize messages", new
>>>> MyDeserializationTransform())
>>>>     .apply("write deserialization errors",
>>>>         CompositeTransform.of((PCollectionTuple input) -> {
>>>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>>>             return input.get(mainTag);
>>>>         })
>>>>     .apply("more processing on the deserialized messages", new
>>>> MyOtherTransform())
>>>>
>>>> I'd be interested in contributing a patch to add this functionality,
>>>> perhaps as a static method PTransform.compose(). Would that patch be
>>>> welcome? Are there other thoughts on naming?
>>>>
>>>> The full code of the CompositeTransform interface we're currently using
>>>> is included below.
>>>>
>>>>
>>>> public interface CompositeTransform<InputT extends PInput, OutputT
>>>> extends POutput> {
>>>>   OutputT expand(InputT input);
>>>>
>>>>   /**
>>>>    * The public factory method that serves as the entrypoint for users
>>>> to create a composite PTransform.
>>>>    */
>>>>   static <InputT extends PInput, OutputT extends POutput>
>>>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>>>> OutputT> transform) {
>>>>     return new PTransform<InputT, OutputT>() {
>>>>       @Override
>>>>       public OutputT expand(InputT input) {
>>>>         return transform.expand(input);
>>>>       }
>>>>     };
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Jeff Klukas <jk...@mozilla.com>.
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
was hoping to get context on, since I don't yet have extensive experience
with beam.

I have not yet run into issues where the output coder was not able to be
inferred. I expect this may be a non-issue, as the individual transforms
used within a user-provided lambda expression would presumably expose the
ability to specify a coder.

I don't have enough context yet to comment on whether display data might be
an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik <lc...@google.com> wrote:

> Thanks for the proposal and it does seem to make the API cleaner to build
> anonymous composite transforms.
>
> In your experience have you had issues where the API doesn't work out well
> because the PTransform:
> * is not able to override how the output coder is inferred?
> * can't supply display data?
>
> +user@beam.apache.org <us...@beam.apache.org>, do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
>> suggestion and make it more concrete:
>>
>> https://issues.apache.org/jira/browse/BEAM-5413
>> https://github.com/apache/beam/pull/6414
>>
>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>> using Beam. I've been impressed with the usability of the API as there are
>>> good built-in solutions for handling many simple transformation cases with
>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>> missing.
>>>
>>> It appears that none of the existing PTransform factories are generic
>>> enough to take in or output a PCollectionTuple, but we've found many use
>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>> in a lambda expression.
>>>
>>> For example, we've defined several PTransforms that return main and
>>> error output stream bundled in a PCollectionTuple. We defined a
>>> CompositeTransform interface so that we could handle the error output in a
>>> lambda expression like:
>>>
>>> pipeline
>>>     .apply("attempt to deserialize messages", new
>>> MyDeserializationTransform())
>>>     .apply("write deserialization errors",
>>>         CompositeTransform.of((PCollectionTuple input) -> {
>>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>>             return input.get(mainTag);
>>>         })
>>>     .apply("more processing on the deserialized messages", new
>>> MyOtherTransform())
>>>
>>> I'd be interested in contributing a patch to add this functionality,
>>> perhaps as a static method PTransform.compose(). Would that patch be
>>> welcome? Are there other thoughts on naming?
>>>
>>> The full code of the CompositeTransform interface we're currently using
>>> is included below.
>>>
>>>
>>> public interface CompositeTransform<InputT extends PInput, OutputT
>>> extends POutput> {
>>>   OutputT expand(InputT input);
>>>
>>>   /**
>>>    * The public factory method that serves as the entrypoint for users
>>> to create a composite PTransform.
>>>    */
>>>   static <InputT extends PInput, OutputT extends POutput>
>>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>>> OutputT> transform) {
>>>     return new PTransform<InputT, OutputT>() {
>>>       @Override
>>>       public OutputT expand(InputT input) {
>>>         return transform.expand(input);
>>>       }
>>>     };
>>>   }
>>> }
>>>
>>>
>>>
>>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Jeff Klukas <jk...@mozilla.com>.
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
was hoping to get context on, since I don't yet have extensive experience
with beam.

I have not yet run into issues where the output coder was not able to be
inferred. I expect this may be a non-issue, as the individual transforms
used within a user-provided lambda expression would presumably expose the
ability to specify a coder.

I don't have enough context yet to comment on whether display data might be
an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik <lc...@google.com> wrote:

> Thanks for the proposal and it does seem to make the API cleaner to build
> anonymous composite transforms.
>
> In your experience have you had issues where the API doesn't work out well
> because the PTransform:
> * is not able to override how the output coder is inferred?
> * can't supply display data?
>
> +user@beam.apache.org <us...@beam.apache.org>, do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
>> suggestion and make it more concrete:
>>
>> https://issues.apache.org/jira/browse/BEAM-5413
>> https://github.com/apache/beam/pull/6414
>>
>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>> using Beam. I've been impressed with the usability of the API as there are
>>> good built-in solutions for handling many simple transformation cases with
>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>> missing.
>>>
>>> It appears that none of the existing PTransform factories are generic
>>> enough to take in or output a PCollectionTuple, but we've found many use
>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>> in a lambda expression.
>>>
>>> For example, we've defined several PTransforms that return main and
>>> error output stream bundled in a PCollectionTuple. We defined a
>>> CompositeTransform interface so that we could handle the error output in a
>>> lambda expression like:
>>>
>>> pipeline
>>>     .apply("attempt to deserialize messages", new
>>> MyDeserializationTransform())
>>>     .apply("write deserialization errors",
>>>         CompositeTransform.of((PCollectionTuple input) -> {
>>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>>             return input.get(mainTag);
>>>         })
>>>     .apply("more processing on the deserialized messages", new
>>> MyOtherTransform())
>>>
>>> I'd be interested in contributing a patch to add this functionality,
>>> perhaps as a static method PTransform.compose(). Would that patch be
>>> welcome? Are there other thoughts on naming?
>>>
>>> The full code of the CompositeTransform interface we're currently using
>>> is included below.
>>>
>>>
>>> public interface CompositeTransform<InputT extends PInput, OutputT
>>> extends POutput> {
>>>   OutputT expand(InputT input);
>>>
>>>   /**
>>>    * The public factory method that serves as the entrypoint for users
>>> to create a composite PTransform.
>>>    */
>>>   static <InputT extends PInput, OutputT extends POutput>
>>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>>> OutputT> transform) {
>>>     return new PTransform<InputT, OutputT>() {
>>>       @Override
>>>       public OutputT expand(InputT input) {
>>>         return transform.expand(input);
>>>       }
>>>     };
>>>   }
>>> }
>>>
>>>
>>>
>>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the proposal and it does seem to make the API cleaner to build
anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well
because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+user@beam.apache.org <us...@beam.apache.org>, do users think that the
provided API would be useful enough for it to be added to the core SDK or
would the addition of the method provide noise/detract from the existing
API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:

> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
> suggestion and make it more concrete:
>
> https://issues.apache.org/jira/browse/BEAM-5413
> https://github.com/apache/beam/pull/6414
>
> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Hello all, I'm a data engineer at Mozilla working on a first project
>> using Beam. I've been impressed with the usability of the API as there are
>> good built-in solutions for handling many simple transformation cases with
>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>> missing.
>>
>> It appears that none of the existing PTransform factories are generic
>> enough to take in or output a PCollectionTuple, but we've found many use
>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>> in a lambda expression.
>>
>> For example, we've defined several PTransforms that return main and error
>> output stream bundled in a PCollectionTuple. We defined a
>> CompositeTransform interface so that we could handle the error output in a
>> lambda expression like:
>>
>> pipeline
>>     .apply("attempt to deserialize messages", new
>> MyDeserializationTransform())
>>     .apply("write deserialization errors",
>>         CompositeTransform.of((PCollectionTuple input) -> {
>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>             return input.get(mainTag);
>>         })
>>     .apply("more processing on the deserialized messages", new
>> MyOtherTransform())
>>
>> I'd be interested in contributing a patch to add this functionality,
>> perhaps as a static method PTransform.compose(). Would that patch be
>> welcome? Are there other thoughts on naming?
>>
>> The full code of the CompositeTransform interface we're currently using
>> is included below.
>>
>>
>> public interface CompositeTransform<InputT extends PInput, OutputT
>> extends POutput> {
>>   OutputT expand(InputT input);
>>
>>   /**
>>    * The public factory method that serves as the entrypoint for users to
>> create a composite PTransform.
>>    */
>>   static <InputT extends PInput, OutputT extends POutput>
>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>> OutputT> transform) {
>>     return new PTransform<InputT, OutputT>() {
>>       @Override
>>       public OutputT expand(InputT input) {
>>         return transform.expand(input);
>>       }
>>     };
>>   }
>> }
>>
>>
>>
>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the proposal and it does seem to make the API cleaner to build
anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well
because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+user@beam.apache.org <us...@beam.apache.org>, do users think that the
provided API would be useful enough for it to be added to the core SDK or
would the addition of the method provide noise/detract from the existing
API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jk...@mozilla.com> wrote:

> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
> suggestion and make it more concrete:
>
> https://issues.apache.org/jira/browse/BEAM-5413
> https://github.com/apache/beam/pull/6414
>
> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Hello all, I'm a data engineer at Mozilla working on a first project
>> using Beam. I've been impressed with the usability of the API as there are
>> good built-in solutions for handling many simple transformation cases with
>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>> missing.
>>
>> It appears that none of the existing PTransform factories are generic
>> enough to take in or output a PCollectionTuple, but we've found many use
>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>> in a lambda expression.
>>
>> For example, we've defined several PTransforms that return main and error
>> output stream bundled in a PCollectionTuple. We defined a
>> CompositeTransform interface so that we could handle the error output in a
>> lambda expression like:
>>
>> pipeline
>>     .apply("attempt to deserialize messages", new
>> MyDeserializationTransform())
>>     .apply("write deserialization errors",
>>         CompositeTransform.of((PCollectionTuple input) -> {
>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>             return input.get(mainTag);
>>         })
>>     .apply("more processing on the deserialized messages", new
>> MyOtherTransform())
>>
>> I'd be interested in contributing a patch to add this functionality,
>> perhaps as a static method PTransform.compose(). Would that patch be
>> welcome? Are there other thoughts on naming?
>>
>> The full code of the CompositeTransform interface we're currently using
>> is included below.
>>
>>
>> public interface CompositeTransform<InputT extends PInput, OutputT
>> extends POutput> {
>>   OutputT expand(InputT input);
>>
>>   /**
>>    * The public factory method that serves as the entrypoint for users to
>> create a composite PTransform.
>>    */
>>   static <InputT extends PInput, OutputT extends POutput>
>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>> OutputT> transform) {
>>     return new PTransform<InputT, OutputT>() {
>>       @Override
>>       public OutputT expand(InputT input) {
>>         return transform.expand(input);
>>       }
>>     };
>>   }
>> }
>>
>>
>>
>>

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

Posted by Jeff Klukas <jk...@mozilla.com>.
I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
suggestion and make it more concrete:

https://issues.apache.org/jira/browse/BEAM-5413
https://github.com/apache/beam/pull/6414

On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jk...@mozilla.com> wrote:

> Hello all, I'm a data engineer at Mozilla working on a first project using
> Beam. I've been impressed with the usability of the API as there are good
> built-in solutions for handling many simple transformation cases with
> minimal code, and wanted to discuss one bit of ergonomics that seems to be
> missing.
>
> It appears that none of the existing PTransform factories are generic
> enough to take in or output a PCollectionTuple, but we've found many use
> cases where it's convenient to apply a few transforms on a PCollectionTuple
> in a lambda expression.
>
> For example, we've defined several PTransforms that return main and error
> output stream bundled in a PCollectionTuple. We defined a
> CompositeTransform interface so that we could handle the error output in a
> lambda expression like:
>
> pipeline
>     .apply("attempt to deserialize messages", new
> MyDeserializationTransform())
>     .apply("write deserialization errors",
>         CompositeTransform.of((PCollectionTuple input) -> {
>             input.get(errorTag).apply(new MyErrorOutputTransform())
>             return input.get(mainTag);
>         })
>     .apply("more processing on the deserialized messages", new
> MyOtherTransform())
>
> I'd be interested in contributing a patch to add this functionality,
> perhaps as a static method PTransform.compose(). Would that patch be
> welcome? Are there other thoughts on naming?
>
> The full code of the CompositeTransform interface we're currently using is
> included below.
>
>
> public interface CompositeTransform<InputT extends PInput, OutputT extends
> POutput> {
>   OutputT expand(InputT input);
>
>   /**
>    * The public factory method that serves as the entrypoint for users to
> create a composite PTransform.
>    */
>   static <InputT extends PInput, OutputT extends POutput>
>         PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT>
> transform) {
>     return new PTransform<InputT, OutputT>() {
>       @Override
>       public OutputT expand(InputT input) {
>         return transform.expand(input);
>       }
>     };
>   }
> }
>
>
>
>