You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Thomas Weise <th...@apache.org> on 2019/11/04 03:15:46 UTC

Re: The state of external transforms in Beam

This thread was very helpful to find more detail in
https://jira.apache.org/jira/browse/BEAM-7870

It would be great to have cross-language current state mentioned as top
level entry on https://beam.apache.org/roadmap/


On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Thanks for the nice write up Chad.
>
> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Thanks for bringing this up again. My thoughts on the open questions
>> below.
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com> wrote:
>> > That commit solves 2 problems:
>> >
>> > Adds the pubsub Java deps so that they’re available in our portable
>> pipeline
>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>> available as a standard coder. This is required because both PubsubIO.Read
>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>> to hack it to make PubsubMessage appear as a standard coder.
>> >
>> > More details:
>> >
>> > There’s a similar magic commit required for Kafka external transforms
>> > The Jira issue for this problem is here:
>> https://jira.apache.org/jira/browse/BEAM-7870
>> > For problem #2 above there seems to be some consensus forming around
>> using Avro or schema/row coders to send compound types in a portable way.
>> Here’s the PR for making row coders portable
>> > https://github.com/apache/beam/pull/9188
>>
>> +1. Note that this doesn't mean that the IO itself must produce rows;
>> part of the Schema work in Java is to make it easy to automatically
>> convert from various Java classes to schemas transparently, so this
>> same logic that would allow one to apply an SQL filter directly to a
>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>> work, we need not uglify the Java API; we can have an
>> option/alternative transform that appends the convert-to-Row DoFn for
>> easier use by external (though the goal of the former work is to make
>> this step unnecissary).
>>
>
> Updating all IO connectors / transforms to have a version that
> produces/consumes a PCollection<Row> is infeasible so I agree that we need
> an automatic conversion to/from PCollection<Row> possibly by injecting
> PTransfroms during ExternalTransform expansion.
>
>>
>> > I don’t really have any ideas for problem #1
>>
>> The crux of the issue here is that the jobs API was not designed with
>> cross-language in mind, and so the artifact API ties artifacts to jobs
>> rather than to environments. To solve this we need to augment the
>> notion of environment to allow the specification of additional
>> dependencies (e.g. jar files in this specific case, or better as
>> maven/pypi/... dependencies (with version ranges) such that
>> environment merging and dependency resolution can be sanely done), and
>> a way for the expansion service to provide such dependencies.
>>
>> Max wrote up a summary of the prior discussions at
>>
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>
>> In the short term, one can build a custom docker image that has all
>> the requisite dependencies installed.
>>
>> This touches on a related but separable issue that one may want to run
>> some of these transforms "natively" in the same process as the runner
>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>> (Similarly with subprocess.) Exactly how that works with environment
>> specifications is also a bit TBD, but my proposal has been that these
>> are best viewed as runner-specific substitutions of standard
>> environments.
>>
>
> We need a permanent solution for this but for now we have a temporary
> solution where additional jar files can be specified through an experiment
> when running a Python pipeline:
> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>
> Thanks,
> Cham
>
>
>>
>> > So the portability expansion system works, and now it’s time to sand
>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>> resolve some of these remaining issues.
>>
>> +1
>>
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com> wrote:
>> >
>> > Hi all,
>> > There was some interest in this topic at the Beam Summit this week
>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>> the current state of things.
>> > First, let me explain the idea behind an external transforms for the
>> uninitiated.
>> >
>> > Problem:
>> >
>> > there’s a transform that you want to use, but it’s not available in
>> your desired language. IO connectors are a good example: there are many
>> available in the Java SDK, but not so much in Python or Go.
>> >
>> > Solution:
>> >
>> > Create a stub transform in your desired language (e.g. Python) whose
>> primary role is to serialize the parameters passed to that transform
>> > When you run your portable pipeline, just prior to it being sent to the
>> Job Service for execution, your stub transform’s payload is first sent to
>> the “Expansion Service” that’s running in the native language (Java), where
>> the payload is used to construct an instance of the native transform, which
>> is then expanded and converted to a protobuf and sent back to the calling
>> process (Python).
>> > The protobuf representation of the expanded transform gets integrated
>> back into the pipeline that you’re submitting
>> > Steps 2-3 are repeated for each external transform in your pipeline
>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>> Flink/Spark/etc
>> >
>> > ________________________________
>> >
>> > Now on to my journey to get PubsubIO working in python on Flink.
>> >
>> > The first issue I encountered was that there was a lot of boilerplate
>> involved in serializing the stub python transform’s parameters so they can
>> be sent to the expansion service.
>> >
>> > I created a PR to make this simpler, which has just been merged to
>> master: https://github.com/apache/beam/pull/9098
>> >
>> > With this feature in place, if you’re using python 3.7 you can use a
>> dataclass and the typing module to create your transform and describe your
>> schema in one go. For example:
>> >
>> >     @dataclasses.dataclass
>> >     class MyAwesomeTransform(beam.ExternalTransform):
>> >       URN = 'beam:external:fakeurn:v1'
>> >
>> >       integer_example: int
>> >       string_example: str
>> >       list_of_strings: List[str]
>> >       optional_kv: Optional[Tuple[str, float]] = None
>> >       optional_integer: Optional[int] = None
>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>> >
>> > For earlier versions of python, you can use typing.NamedTuple to
>> declare your schema.
>> >
>> >     MyAwesomeSchema = typing.NamedTuple(
>> >         'MyAwesomeSchema',
>> >         [
>> >             ('integer_example', int),
>> >             ('string_example', unicode),
>> >             ('list_of_strings', List[unicode]),
>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>> >             ('optional_integer', Optional[int]),
>> >         ]
>> >     )
>> >
>> > There’s also an option to generate the schema implicitly based on the
>> value(s) you wish to serialize.
>> >
>> > There was a slight tangent in implementing this feature in that
>> requesting a coder for typing.List resulted in pickle coder instead of
>> IterableCoder. That’s bad because only standard/portable coders can be used
>> for expansion in Java (for obvious reasons), so as a convenience that was
>> solved here: https://github.com/apache/beam/pull/9344
>> >
>> > The next issue that I encountered was that python did not track the
>> boundedness of PCollections, which made it impossible to use the expansion
>> service to create unbounded writes. That’s been solved and merged here:
>> https://github.com/apache/beam/pull/9426
>> >
>> > So that brings us to the actual PR for adding external transform
>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>> >
>> > The PR works, but with one big caveat: in order to use it you must
>> build your Java containers with this special commit:
>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>> >
>> > That commit solves 2 problems:
>> >
>> > Adds the pubsub Java deps so that they’re available in our portable
>> pipeline
>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>> available as a standard coder. This is required because both PubsubIO.Read
>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>> to hack it to make PubsubMessage appear as a standard coder.
>> >
>> > More details:
>> >
>> > There’s a similar magic commit required for Kafka external transforms
>> > The Jira issue for this problem is here:
>> https://jira.apache.org/jira/browse/BEAM-7870
>> > For problem #2 above there seems to be some consensus forming around
>> using Avro or schema/row coders to send compound types in a portable way.
>> Here’s the PR for making row coders portable
>> > https://github.com/apache/beam/pull/9188
>> > I don’t really have any ideas for problem #1
>> >
>> > So the portability expansion system works, and now it’s time to sand
>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>> resolve some of these remaining issues.
>> >
>> > -chad
>>
>

Re: The state of external transforms in Beam

Posted by Chamikara Jayalath <ch...@google.com>.
Send https://github.com/apache/beam/pull/10054 to update the roadmap.

Thanks,
Cham

On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Makes sense.
>
> I can look into expanding on what we have at following location and adding
> links to some of the existing work as a first step.
> https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Created https://issues.apache.org/jira/browse/BEAM-8553
>
> We also need more detailed documentation for cross-language transforms but
> that can be separate (and hopefully with help from tech writers who have
> been helping with Beam documentation in general).
>
> Thanks,
> Cham
>
>
> On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <th...@apache.org> wrote:
>
>> This thread was very helpful to find more detail in
>> https://jira.apache.org/jira/browse/BEAM-7870
>>
>> It would be great to have cross-language current state mentioned as top
>> level entry on https://beam.apache.org/roadmap/
>>
>>
>> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Thanks for the nice write up Chad.
>>>
>>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Thanks for bringing this up again. My thoughts on the open questions
>>>> below.
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>>> wrote:
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>>
>>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>>> part of the Schema work in Java is to make it easy to automatically
>>>> convert from various Java classes to schemas transparently, so this
>>>> same logic that would allow one to apply an SQL filter directly to a
>>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>>> work, we need not uglify the Java API; we can have an
>>>> option/alternative transform that appends the convert-to-Row DoFn for
>>>> easier use by external (though the goal of the former work is to make
>>>> this step unnecissary).
>>>>
>>>
>>> Updating all IO connectors / transforms to have a version that
>>> produces/consumes a PCollection<Row> is infeasible so I agree that we need
>>> an automatic conversion to/from PCollection<Row> possibly by injecting
>>> PTransfroms during ExternalTransform expansion.
>>>
>>>>
>>>> > I don’t really have any ideas for problem #1
>>>>
>>>> The crux of the issue here is that the jobs API was not designed with
>>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>>> rather than to environments. To solve this we need to augment the
>>>> notion of environment to allow the specification of additional
>>>> dependencies (e.g. jar files in this specific case, or better as
>>>> maven/pypi/... dependencies (with version ranges) such that
>>>> environment merging and dependency resolution can be sanely done), and
>>>> a way for the expansion service to provide such dependencies.
>>>>
>>>> Max wrote up a summary of the prior discussions at
>>>>
>>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>>
>>>> In the short term, one can build a custom docker image that has all
>>>> the requisite dependencies installed.
>>>>
>>>> This touches on a related but separable issue that one may want to run
>>>> some of these transforms "natively" in the same process as the runner
>>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>>> (Similarly with subprocess.) Exactly how that works with environment
>>>> specifications is also a bit TBD, but my proposal has been that these
>>>> are best viewed as runner-specific substitutions of standard
>>>> environments.
>>>>
>>>
>>> We need a permanent solution for this but for now we have a temporary
>>> solution where additional jar files can be specified through an experiment
>>> when running a Python pipeline:
>>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>>
>>>> +1
>>>>
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi all,
>>>> > There was some interest in this topic at the Beam Summit this week
>>>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>>>> the current state of things.
>>>> > First, let me explain the idea behind an external transforms for the
>>>> uninitiated.
>>>> >
>>>> > Problem:
>>>> >
>>>> > there’s a transform that you want to use, but it’s not available in
>>>> your desired language. IO connectors are a good example: there are many
>>>> available in the Java SDK, but not so much in Python or Go.
>>>> >
>>>> > Solution:
>>>> >
>>>> > Create a stub transform in your desired language (e.g. Python) whose
>>>> primary role is to serialize the parameters passed to that transform
>>>> > When you run your portable pipeline, just prior to it being sent to
>>>> the Job Service for execution, your stub transform’s payload is first sent
>>>> to the “Expansion Service” that’s running in the native language (Java),
>>>> where the payload is used to construct an instance of the native transform,
>>>> which is then expanded and converted to a protobuf and sent back to the
>>>> calling process (Python).
>>>> > The protobuf representation of the expanded transform gets integrated
>>>> back into the pipeline that you’re submitting
>>>> > Steps 2-3 are repeated for each external transform in your pipeline
>>>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>>>> Flink/Spark/etc
>>>> >
>>>> > ________________________________
>>>> >
>>>> > Now on to my journey to get PubsubIO working in python on Flink.
>>>> >
>>>> > The first issue I encountered was that there was a lot of boilerplate
>>>> involved in serializing the stub python transform’s parameters so they can
>>>> be sent to the expansion service.
>>>> >
>>>> > I created a PR to make this simpler, which has just been merged to
>>>> master: https://github.com/apache/beam/pull/9098
>>>> >
>>>> > With this feature in place, if you’re using python 3.7 you can use a
>>>> dataclass and the typing module to create your transform and describe your
>>>> schema in one go. For example:
>>>> >
>>>> >     @dataclasses.dataclass
>>>> >     class MyAwesomeTransform(beam.ExternalTransform):
>>>> >       URN = 'beam:external:fakeurn:v1'
>>>> >
>>>> >       integer_example: int
>>>> >       string_example: str
>>>> >       list_of_strings: List[str]
>>>> >       optional_kv: Optional[Tuple[str, float]] = None
>>>> >       optional_integer: Optional[int] = None
>>>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>>>> >
>>>> > For earlier versions of python, you can use typing.NamedTuple to
>>>> declare your schema.
>>>> >
>>>> >     MyAwesomeSchema = typing.NamedTuple(
>>>> >         'MyAwesomeSchema',
>>>> >         [
>>>> >             ('integer_example', int),
>>>> >             ('string_example', unicode),
>>>> >             ('list_of_strings', List[unicode]),
>>>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>>>> >             ('optional_integer', Optional[int]),
>>>> >         ]
>>>> >     )
>>>> >
>>>> > There’s also an option to generate the schema implicitly based on the
>>>> value(s) you wish to serialize.
>>>> >
>>>> > There was a slight tangent in implementing this feature in that
>>>> requesting a coder for typing.List resulted in pickle coder instead of
>>>> IterableCoder. That’s bad because only standard/portable coders can be used
>>>> for expansion in Java (for obvious reasons), so as a convenience that was
>>>> solved here: https://github.com/apache/beam/pull/9344
>>>> >
>>>> > The next issue that I encountered was that python did not track the
>>>> boundedness of PCollections, which made it impossible to use the expansion
>>>> service to create unbounded writes. That’s been solved and merged here:
>>>> https://github.com/apache/beam/pull/9426
>>>> >
>>>> > So that brings us to the actual PR for adding external transform
>>>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>>>> >
>>>> > The PR works, but with one big caveat: in order to use it you must
>>>> build your Java containers with this special commit:
>>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>>>> >
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>> > I don’t really have any ideas for problem #1
>>>> >
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>> >
>>>> > -chad
>>>>
>>>

Re: The state of external transforms in Beam

Posted by Chamikara Jayalath <ch...@google.com>.
Send https://github.com/apache/beam/pull/10054 to update the roadmap.

Thanks,
Cham

On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Makes sense.
>
> I can look into expanding on what we have at following location and adding
> links to some of the existing work as a first step.
> https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Created https://issues.apache.org/jira/browse/BEAM-8553
>
> We also need more detailed documentation for cross-language transforms but
> that can be separate (and hopefully with help from tech writers who have
> been helping with Beam documentation in general).
>
> Thanks,
> Cham
>
>
> On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <th...@apache.org> wrote:
>
>> This thread was very helpful to find more detail in
>> https://jira.apache.org/jira/browse/BEAM-7870
>>
>> It would be great to have cross-language current state mentioned as top
>> level entry on https://beam.apache.org/roadmap/
>>
>>
>> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Thanks for the nice write up Chad.
>>>
>>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Thanks for bringing this up again. My thoughts on the open questions
>>>> below.
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>>> wrote:
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>>
>>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>>> part of the Schema work in Java is to make it easy to automatically
>>>> convert from various Java classes to schemas transparently, so this
>>>> same logic that would allow one to apply an SQL filter directly to a
>>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>>> work, we need not uglify the Java API; we can have an
>>>> option/alternative transform that appends the convert-to-Row DoFn for
>>>> easier use by external (though the goal of the former work is to make
>>>> this step unnecissary).
>>>>
>>>
>>> Updating all IO connectors / transforms to have a version that
>>> produces/consumes a PCollection<Row> is infeasible so I agree that we need
>>> an automatic conversion to/from PCollection<Row> possibly by injecting
>>> PTransfroms during ExternalTransform expansion.
>>>
>>>>
>>>> > I don’t really have any ideas for problem #1
>>>>
>>>> The crux of the issue here is that the jobs API was not designed with
>>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>>> rather than to environments. To solve this we need to augment the
>>>> notion of environment to allow the specification of additional
>>>> dependencies (e.g. jar files in this specific case, or better as
>>>> maven/pypi/... dependencies (with version ranges) such that
>>>> environment merging and dependency resolution can be sanely done), and
>>>> a way for the expansion service to provide such dependencies.
>>>>
>>>> Max wrote up a summary of the prior discussions at
>>>>
>>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>>
>>>> In the short term, one can build a custom docker image that has all
>>>> the requisite dependencies installed.
>>>>
>>>> This touches on a related but separable issue that one may want to run
>>>> some of these transforms "natively" in the same process as the runner
>>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>>> (Similarly with subprocess.) Exactly how that works with environment
>>>> specifications is also a bit TBD, but my proposal has been that these
>>>> are best viewed as runner-specific substitutions of standard
>>>> environments.
>>>>
>>>
>>> We need a permanent solution for this but for now we have a temporary
>>> solution where additional jar files can be specified through an experiment
>>> when running a Python pipeline:
>>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>>
>>>> +1
>>>>
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi all,
>>>> > There was some interest in this topic at the Beam Summit this week
>>>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>>>> the current state of things.
>>>> > First, let me explain the idea behind an external transforms for the
>>>> uninitiated.
>>>> >
>>>> > Problem:
>>>> >
>>>> > there’s a transform that you want to use, but it’s not available in
>>>> your desired language. IO connectors are a good example: there are many
>>>> available in the Java SDK, but not so much in Python or Go.
>>>> >
>>>> > Solution:
>>>> >
>>>> > Create a stub transform in your desired language (e.g. Python) whose
>>>> primary role is to serialize the parameters passed to that transform
>>>> > When you run your portable pipeline, just prior to it being sent to
>>>> the Job Service for execution, your stub transform’s payload is first sent
>>>> to the “Expansion Service” that’s running in the native language (Java),
>>>> where the payload is used to construct an instance of the native transform,
>>>> which is then expanded and converted to a protobuf and sent back to the
>>>> calling process (Python).
>>>> > The protobuf representation of the expanded transform gets integrated
>>>> back into the pipeline that you’re submitting
>>>> > Steps 2-3 are repeated for each external transform in your pipeline
>>>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>>>> Flink/Spark/etc
>>>> >
>>>> > ________________________________
>>>> >
>>>> > Now on to my journey to get PubsubIO working in python on Flink.
>>>> >
>>>> > The first issue I encountered was that there was a lot of boilerplate
>>>> involved in serializing the stub python transform’s parameters so they can
>>>> be sent to the expansion service.
>>>> >
>>>> > I created a PR to make this simpler, which has just been merged to
>>>> master: https://github.com/apache/beam/pull/9098
>>>> >
>>>> > With this feature in place, if you’re using python 3.7 you can use a
>>>> dataclass and the typing module to create your transform and describe your
>>>> schema in one go. For example:
>>>> >
>>>> >     @dataclasses.dataclass
>>>> >     class MyAwesomeTransform(beam.ExternalTransform):
>>>> >       URN = 'beam:external:fakeurn:v1'
>>>> >
>>>> >       integer_example: int
>>>> >       string_example: str
>>>> >       list_of_strings: List[str]
>>>> >       optional_kv: Optional[Tuple[str, float]] = None
>>>> >       optional_integer: Optional[int] = None
>>>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>>>> >
>>>> > For earlier versions of python, you can use typing.NamedTuple to
>>>> declare your schema.
>>>> >
>>>> >     MyAwesomeSchema = typing.NamedTuple(
>>>> >         'MyAwesomeSchema',
>>>> >         [
>>>> >             ('integer_example', int),
>>>> >             ('string_example', unicode),
>>>> >             ('list_of_strings', List[unicode]),
>>>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>>>> >             ('optional_integer', Optional[int]),
>>>> >         ]
>>>> >     )
>>>> >
>>>> > There’s also an option to generate the schema implicitly based on the
>>>> value(s) you wish to serialize.
>>>> >
>>>> > There was a slight tangent in implementing this feature in that
>>>> requesting a coder for typing.List resulted in pickle coder instead of
>>>> IterableCoder. That’s bad because only standard/portable coders can be used
>>>> for expansion in Java (for obvious reasons), so as a convenience that was
>>>> solved here: https://github.com/apache/beam/pull/9344
>>>> >
>>>> > The next issue that I encountered was that python did not track the
>>>> boundedness of PCollections, which made it impossible to use the expansion
>>>> service to create unbounded writes. That’s been solved and merged here:
>>>> https://github.com/apache/beam/pull/9426
>>>> >
>>>> > So that brings us to the actual PR for adding external transform
>>>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>>>> >
>>>> > The PR works, but with one big caveat: in order to use it you must
>>>> build your Java containers with this special commit:
>>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>>>> >
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>> > I don’t really have any ideas for problem #1
>>>> >
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>> >
>>>> > -chad
>>>>
>>>

Re: The state of external transforms in Beam

Posted by Chamikara Jayalath <ch...@google.com>.
Makes sense.

I can look into expanding on what we have at following location and adding
links to some of the existing work as a first step.
https://beam.apache.org/roadmap/connectors-multi-sdk/

Created https://issues.apache.org/jira/browse/BEAM-8553

We also need more detailed documentation for cross-language transforms but
that can be separate (and hopefully with help from tech writers who have
been helping with Beam documentation in general).

Thanks,
Cham


On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <th...@apache.org> wrote:

> This thread was very helpful to find more detail in
> https://jira.apache.org/jira/browse/BEAM-7870
>
> It would be great to have cross-language current state mentioned as top
> level entry on https://beam.apache.org/roadmap/
>
>
> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Thanks for the nice write up Chad.
>>
>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Thanks for bringing this up again. My thoughts on the open questions
>>> below.
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>> wrote:
>>> > That commit solves 2 problems:
>>> >
>>> > Adds the pubsub Java deps so that they’re available in our portable
>>> pipeline
>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>> available as a standard coder. This is required because both PubsubIO.Read
>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>> to hack it to make PubsubMessage appear as a standard coder.
>>> >
>>> > More details:
>>> >
>>> > There’s a similar magic commit required for Kafka external transforms
>>> > The Jira issue for this problem is here:
>>> https://jira.apache.org/jira/browse/BEAM-7870
>>> > For problem #2 above there seems to be some consensus forming around
>>> using Avro or schema/row coders to send compound types in a portable way.
>>> Here’s the PR for making row coders portable
>>> > https://github.com/apache/beam/pull/9188
>>>
>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>> part of the Schema work in Java is to make it easy to automatically
>>> convert from various Java classes to schemas transparently, so this
>>> same logic that would allow one to apply an SQL filter directly to a
>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>> work, we need not uglify the Java API; we can have an
>>> option/alternative transform that appends the convert-to-Row DoFn for
>>> easier use by external (though the goal of the former work is to make
>>> this step unnecissary).
>>>
>>
>> Updating all IO connectors / transforms to have a version that
>> produces/consumes a PCollection<Row> is infeasible so I agree that we need
>> an automatic conversion to/from PCollection<Row> possibly by injecting
>> PTransfroms during ExternalTransform expansion.
>>
>>>
>>> > I don’t really have any ideas for problem #1
>>>
>>> The crux of the issue here is that the jobs API was not designed with
>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>> rather than to environments. To solve this we need to augment the
>>> notion of environment to allow the specification of additional
>>> dependencies (e.g. jar files in this specific case, or better as
>>> maven/pypi/... dependencies (with version ranges) such that
>>> environment merging and dependency resolution can be sanely done), and
>>> a way for the expansion service to provide such dependencies.
>>>
>>> Max wrote up a summary of the prior discussions at
>>>
>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>
>>> In the short term, one can build a custom docker image that has all
>>> the requisite dependencies installed.
>>>
>>> This touches on a related but separable issue that one may want to run
>>> some of these transforms "natively" in the same process as the runner
>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>> (Similarly with subprocess.) Exactly how that works with environment
>>> specifications is also a bit TBD, but my proposal has been that these
>>> are best viewed as runner-specific substitutions of standard
>>> environments.
>>>
>>
>> We need a permanent solution for this but for now we have a temporary
>> solution where additional jar files can be specified through an experiment
>> when running a Python pipeline:
>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> > So the portability expansion system works, and now it’s time to sand
>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>> resolve some of these remaining issues.
>>>
>>> +1
>>>
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>> wrote:
>>> >
>>> > Hi all,
>>> > There was some interest in this topic at the Beam Summit this week
>>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>>> the current state of things.
>>> > First, let me explain the idea behind an external transforms for the
>>> uninitiated.
>>> >
>>> > Problem:
>>> >
>>> > there’s a transform that you want to use, but it’s not available in
>>> your desired language. IO connectors are a good example: there are many
>>> available in the Java SDK, but not so much in Python or Go.
>>> >
>>> > Solution:
>>> >
>>> > Create a stub transform in your desired language (e.g. Python) whose
>>> primary role is to serialize the parameters passed to that transform
>>> > When you run your portable pipeline, just prior to it being sent to
>>> the Job Service for execution, your stub transform’s payload is first sent
>>> to the “Expansion Service” that’s running in the native language (Java),
>>> where the payload is used to construct an instance of the native transform,
>>> which is then expanded and converted to a protobuf and sent back to the
>>> calling process (Python).
>>> > The protobuf representation of the expanded transform gets integrated
>>> back into the pipeline that you’re submitting
>>> > Steps 2-3 are repeated for each external transform in your pipeline
>>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>>> Flink/Spark/etc
>>> >
>>> > ________________________________
>>> >
>>> > Now on to my journey to get PubsubIO working in python on Flink.
>>> >
>>> > The first issue I encountered was that there was a lot of boilerplate
>>> involved in serializing the stub python transform’s parameters so they can
>>> be sent to the expansion service.
>>> >
>>> > I created a PR to make this simpler, which has just been merged to
>>> master: https://github.com/apache/beam/pull/9098
>>> >
>>> > With this feature in place, if you’re using python 3.7 you can use a
>>> dataclass and the typing module to create your transform and describe your
>>> schema in one go. For example:
>>> >
>>> >     @dataclasses.dataclass
>>> >     class MyAwesomeTransform(beam.ExternalTransform):
>>> >       URN = 'beam:external:fakeurn:v1'
>>> >
>>> >       integer_example: int
>>> >       string_example: str
>>> >       list_of_strings: List[str]
>>> >       optional_kv: Optional[Tuple[str, float]] = None
>>> >       optional_integer: Optional[int] = None
>>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>>> >
>>> > For earlier versions of python, you can use typing.NamedTuple to
>>> declare your schema.
>>> >
>>> >     MyAwesomeSchema = typing.NamedTuple(
>>> >         'MyAwesomeSchema',
>>> >         [
>>> >             ('integer_example', int),
>>> >             ('string_example', unicode),
>>> >             ('list_of_strings', List[unicode]),
>>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>>> >             ('optional_integer', Optional[int]),
>>> >         ]
>>> >     )
>>> >
>>> > There’s also an option to generate the schema implicitly based on the
>>> value(s) you wish to serialize.
>>> >
>>> > There was a slight tangent in implementing this feature in that
>>> requesting a coder for typing.List resulted in pickle coder instead of
>>> IterableCoder. That’s bad because only standard/portable coders can be used
>>> for expansion in Java (for obvious reasons), so as a convenience that was
>>> solved here: https://github.com/apache/beam/pull/9344
>>> >
>>> > The next issue that I encountered was that python did not track the
>>> boundedness of PCollections, which made it impossible to use the expansion
>>> service to create unbounded writes. That’s been solved and merged here:
>>> https://github.com/apache/beam/pull/9426
>>> >
>>> > So that brings us to the actual PR for adding external transform
>>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>>> >
>>> > The PR works, but with one big caveat: in order to use it you must
>>> build your Java containers with this special commit:
>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>>> >
>>> > That commit solves 2 problems:
>>> >
>>> > Adds the pubsub Java deps so that they’re available in our portable
>>> pipeline
>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>> available as a standard coder. This is required because both PubsubIO.Read
>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>> to hack it to make PubsubMessage appear as a standard coder.
>>> >
>>> > More details:
>>> >
>>> > There’s a similar magic commit required for Kafka external transforms
>>> > The Jira issue for this problem is here:
>>> https://jira.apache.org/jira/browse/BEAM-7870
>>> > For problem #2 above there seems to be some consensus forming around
>>> using Avro or schema/row coders to send compound types in a portable way.
>>> Here’s the PR for making row coders portable
>>> > https://github.com/apache/beam/pull/9188
>>> > I don’t really have any ideas for problem #1
>>> >
>>> > So the portability expansion system works, and now it’s time to sand
>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>> resolve some of these remaining issues.
>>> >
>>> > -chad
>>>
>>

Re: The state of external transforms in Beam

Posted by Chamikara Jayalath <ch...@google.com>.
Makes sense.

I can look into expanding on what we have at following location and adding
links to some of the existing work as a first step.
https://beam.apache.org/roadmap/connectors-multi-sdk/

Created https://issues.apache.org/jira/browse/BEAM-8553

We also need more detailed documentation for cross-language transforms but
that can be separate (and hopefully with help from tech writers who have
been helping with Beam documentation in general).

Thanks,
Cham


On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <th...@apache.org> wrote:

> This thread was very helpful to find more detail in
> https://jira.apache.org/jira/browse/BEAM-7870
>
> It would be great to have cross-language current state mentioned as top
> level entry on https://beam.apache.org/roadmap/
>
>
> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Thanks for the nice write up Chad.
>>
>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Thanks for bringing this up again. My thoughts on the open questions
>>> below.
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>> wrote:
>>> > That commit solves 2 problems:
>>> >
>>> > Adds the pubsub Java deps so that they’re available in our portable
>>> pipeline
>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>> available as a standard coder. This is required because both PubsubIO.Read
>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>> to hack it to make PubsubMessage appear as a standard coder.
>>> >
>>> > More details:
>>> >
>>> > There’s a similar magic commit required for Kafka external transforms
>>> > The Jira issue for this problem is here:
>>> https://jira.apache.org/jira/browse/BEAM-7870
>>> > For problem #2 above there seems to be some consensus forming around
>>> using Avro or schema/row coders to send compound types in a portable way.
>>> Here’s the PR for making row coders portable
>>> > https://github.com/apache/beam/pull/9188
>>>
>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>> part of the Schema work in Java is to make it easy to automatically
>>> convert from various Java classes to schemas transparently, so this
>>> same logic that would allow one to apply an SQL filter directly to a
>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>> work, we need not uglify the Java API; we can have an
>>> option/alternative transform that appends the convert-to-Row DoFn for
>>> easier use by external (though the goal of the former work is to make
>>> this step unnecissary).
>>>
>>
>> Updating all IO connectors / transforms to have a version that
>> produces/consumes a PCollection<Row> is infeasible so I agree that we need
>> an automatic conversion to/from PCollection<Row> possibly by injecting
>> PTransfroms during ExternalTransform expansion.
>>
>>>
>>> > I don’t really have any ideas for problem #1
>>>
>>> The crux of the issue here is that the jobs API was not designed with
>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>> rather than to environments. To solve this we need to augment the
>>> notion of environment to allow the specification of additional
>>> dependencies (e.g. jar files in this specific case, or better as
>>> maven/pypi/... dependencies (with version ranges) such that
>>> environment merging and dependency resolution can be sanely done), and
>>> a way for the expansion service to provide such dependencies.
>>>
>>> Max wrote up a summary of the prior discussions at
>>>
>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>
>>> In the short term, one can build a custom docker image that has all
>>> the requisite dependencies installed.
>>>
>>> This touches on a related but separable issue that one may want to run
>>> some of these transforms "natively" in the same process as the runner
>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>> (Similarly with subprocess.) Exactly how that works with environment
>>> specifications is also a bit TBD, but my proposal has been that these
>>> are best viewed as runner-specific substitutions of standard
>>> environments.
>>>
>>
>> We need a permanent solution for this but for now we have a temporary
>> solution where additional jar files can be specified through an experiment
>> when running a Python pipeline:
>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> > So the portability expansion system works, and now it’s time to sand
>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>> resolve some of these remaining issues.
>>>
>>> +1
>>>
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <ch...@gmail.com>
>>> wrote:
>>> >
>>> > Hi all,
>>> > There was some interest in this topic at the Beam Summit this week
>>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>>> the current state of things.
>>> > First, let me explain the idea behind an external transforms for the
>>> uninitiated.
>>> >
>>> > Problem:
>>> >
>>> > there’s a transform that you want to use, but it’s not available in
>>> your desired language. IO connectors are a good example: there are many
>>> available in the Java SDK, but not so much in Python or Go.
>>> >
>>> > Solution:
>>> >
>>> > Create a stub transform in your desired language (e.g. Python) whose
>>> primary role is to serialize the parameters passed to that transform
>>> > When you run your portable pipeline, just prior to it being sent to
>>> the Job Service for execution, your stub transform’s payload is first sent
>>> to the “Expansion Service” that’s running in the native language (Java),
>>> where the payload is used to construct an instance of the native transform,
>>> which is then expanded and converted to a protobuf and sent back to the
>>> calling process (Python).
>>> > The protobuf representation of the expanded transform gets integrated
>>> back into the pipeline that you’re submitting
>>> > Steps 2-3 are repeated for each external transform in your pipeline
>>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>>> Flink/Spark/etc
>>> >
>>> > ________________________________
>>> >
>>> > Now on to my journey to get PubsubIO working in python on Flink.
>>> >
>>> > The first issue I encountered was that there was a lot of boilerplate
>>> involved in serializing the stub python transform’s parameters so they can
>>> be sent to the expansion service.
>>> >
>>> > I created a PR to make this simpler, which has just been merged to
>>> master: https://github.com/apache/beam/pull/9098
>>> >
>>> > With this feature in place, if you’re using python 3.7 you can use a
>>> dataclass and the typing module to create your transform and describe your
>>> schema in one go. For example:
>>> >
>>> >     @dataclasses.dataclass
>>> >     class MyAwesomeTransform(beam.ExternalTransform):
>>> >       URN = 'beam:external:fakeurn:v1'
>>> >
>>> >       integer_example: int
>>> >       string_example: str
>>> >       list_of_strings: List[str]
>>> >       optional_kv: Optional[Tuple[str, float]] = None
>>> >       optional_integer: Optional[int] = None
>>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>>> >
>>> > For earlier versions of python, you can use typing.NamedTuple to
>>> declare your schema.
>>> >
>>> >     MyAwesomeSchema = typing.NamedTuple(
>>> >         'MyAwesomeSchema',
>>> >         [
>>> >             ('integer_example', int),
>>> >             ('string_example', unicode),
>>> >             ('list_of_strings', List[unicode]),
>>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>>> >             ('optional_integer', Optional[int]),
>>> >         ]
>>> >     )
>>> >
>>> > There’s also an option to generate the schema implicitly based on the
>>> value(s) you wish to serialize.
>>> >
>>> > There was a slight tangent in implementing this feature in that
>>> requesting a coder for typing.List resulted in pickle coder instead of
>>> IterableCoder. That’s bad because only standard/portable coders can be used
>>> for expansion in Java (for obvious reasons), so as a convenience that was
>>> solved here: https://github.com/apache/beam/pull/9344
>>> >
>>> > The next issue that I encountered was that python did not track the
>>> boundedness of PCollections, which made it impossible to use the expansion
>>> service to create unbounded writes. That’s been solved and merged here:
>>> https://github.com/apache/beam/pull/9426
>>> >
>>> > So that brings us to the actual PR for adding external transform
>>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>>> >
>>> > The PR works, but with one big caveat: in order to use it you must
>>> build your Java containers with this special commit:
>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>>> >
>>> > That commit solves 2 problems:
>>> >
>>> > Adds the pubsub Java deps so that they’re available in our portable
>>> pipeline
>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>> available as a standard coder. This is required because both PubsubIO.Read
>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>> to hack it to make PubsubMessage appear as a standard coder.
>>> >
>>> > More details:
>>> >
>>> > There’s a similar magic commit required for Kafka external transforms
>>> > The Jira issue for this problem is here:
>>> https://jira.apache.org/jira/browse/BEAM-7870
>>> > For problem #2 above there seems to be some consensus forming around
>>> using Avro or schema/row coders to send compound types in a portable way.
>>> Here’s the PR for making row coders portable
>>> > https://github.com/apache/beam/pull/9188
>>> > I don’t really have any ideas for problem #1
>>> >
>>> > So the portability expansion system works, and now it’s time to sand
>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>> resolve some of these remaining issues.
>>> >
>>> > -chad
>>>
>>