You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kyle Weaver <kc...@google.com> on 2021/02/03 20:34:21 UTC

Environment options for external transforms

Hi Beamers,

Recently we’ve had some requests on user@ and Slack for instructions on how
to use custom-built containers in cross-language pipelines (typically
calling Java transforms from a predominantly Python pipeline). Currently,
it seems like there is no way to change the container used by a
cross-language transform except by modifying and rebuilding the expansion
service. The SDK does not pass pipeline options to the expansion service
(BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
if pipeline options are passed, the existing set of pipeline options still
limits the amount of control we have over environments. Here are the
existing pipeline options that I’m aware of:

Python [2] and Go [3] have these:

   -

   environment_type (DOCKER, PROCESS, LOOPBACK)
   -

   environment_config (This one is confusingly overloaded. It’s a string
   that means different things depending on environment_type. For DOCKER, it
   is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
   the external service address.)


Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
which are named differently but otherwise act the same as the above.

I was unsatisfied with environment_config for a number of reasons. First,
having a single overloaded option that can mean entirely different things
depending on context is poor design. Second, in PROCESS mode, requiring the
user to type in a JSON blob for environment_config is not especially
human-friendly (though it has also been argued that JSON makes complex
arguments like this easier to parse). Finally, we must overload this string
further to introduce new environment-specific options, such as a mounted
Docker volume (BEAM-5440 [5]).

To address these problems, I added a new option called
“environment_options” (BEAM-10671 [6]). (This option has been implemented
in the Python SDK, but not the other SDKs yet.) Environment_options,
similar to the “experiments” option, takes a list of strings, for example
“--environment_option=docker_container_image=my_beam_sdk:latest”. It could
be argued we should have made “docker_container_image” etc. top-level
options instead, but this “catch-all” design makes what I am about to
propose a lot easier.

The solution proposed in PR #11638 [7] set a flag to include unrecognized
pipeline options during serialization, since otherwise unrecognized options
are dropped. In a Python pipeline, this will allow us to set
environment_config and default_environment_config to separate values, for
Python and Java containers, respectively. However, this still limits us to
one container image for all Python and Go transforms, and one container
image for all Java transforms. As more cross-language transforms are
implemented, sooner or later someone will want to have different Java SDK
containers for different external transforms.

(I should also mention the sdk_harness_container_image_overrides pipeline
option [8], which is currently only supported by the Dataflow runner. It
lets us basically perform a find/replace on container image strings. This
is not significantly more flexible than having a single option per SDK,
since the default container images for all external transforms in each SDK
are expected to be the same.)

Environments logically belong with transforms, and that’s how it works in
the Runner API [9]. The problem now is that from the user’s perspective,
the environment is bound to the expansion service. After addressing
BEAM-9449, the problem will be that one or two environments at most are
bound to the pipeline. Ideally, though, users should have fully granular
control over environments at the transform level.

All this context for a very simple proposal: we should have all
ExternalTransform subclasses take optional environment_type and
environment_options fields in their constructors. As with their
corresponding pipeline options, these options would default to DOCKER and
none, respectively. Then we could overwrite the environment_type and
environment_options in the pipeline options passed to the expansion service
with these values. (Alternatively, we could pass environment_type and
environment_options to the expansion service individually to avoid having
to overwrite their original values, but their original values should be
irrelevant to the expansion service anyway.)

What do you think?

[1] https://issues.apache.org/jira/browse/BEAM-9449

[2]
https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115

[3]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53

[4]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71

[5] https://issues.apache.org/jira/browse/BEAM-5440

[6] https://issues.apache.org/jira/browse/BEAM-10671

[7] https://github.com/apache/beam/pull/11638

[8]
https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850

[9]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194

Re: Environment options for external transforms

Posted by Chad Dombrova <ch...@gmail.com>.
>
>
Hi,

First of all, this is an area that could use a lot of help, so thank you
Kyle for digging through the trove of tickets to understand all of the user
stories.


>>>>> I should have led with this. Someone wanted to mount credentials into
>>>>> the SDK harness [1]. So in this particular case the user just wants to
>>>>> mount files into their SDK harness, which is a pretty common use case, so
>>>>> resource hints are probably a more appropriate solution.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>>>>>
>>>>
>>>> Ah, that clarifies things. Would it be possible/preferable to pass the
>>>> credentials as parameters to the transform itself?
>>>>
>>>
> This is very useful for testing as well. For example, to test containers
> generated for release candidates.
>

We have been using custom containers since day 1 to address a number of
issues.  Speaking in the abstract, a UDF might rely on system libraries or
executables — which in turn rely on a specific linux OS —  that do not
exist on the standard container.  A more esoteric example is the need to
run a different python interpreter than what is provided in the container.
In our specific case we needed to use python interpreters that come bundled
with the applications that we use to process data, many of which are only
supported on CentOS.  We also had to modify Beam to allow passing docker
options for mounting data volumes.

-chad









From a design standpoint, I feel find-replace is hacky and backwards. It's
>>>>>>> cleaner to specify what kind of environment we want directly in
>>>>>>> the ExpansionRequest. That way all of the environment creation logic
>>>>>>> belongs inside the expansion service.
>>>>>>>
>>>>>>
>>>>>> While Environments logically belong with Transforms, it is the
>>>>>> expansion service's job to attach the right environments to the transforms
>>>>>> that it vends. The caller should not need any visibility into the
>>>>>> environment(s) that an expansion service uses, which is an implementation
>>>>>> detail that the expansion service is free to change at any time. (In fact,
>>>>>> whether it is (partially or fully) implemented as an external transform is
>>>>>> an implementation detail that the end user should not need to care about or
>>>>>> depend on.)
>>>>>>
>>>>>> I personally think pattern matching and substitution by runners
>>>>>> (maybe more sophisticated than regexp on container names) is a reasonable
>>>>>> way to approach customization of environments. For example, suppose I
>>>>>> construct a pipeline that uses both Python and Java transforms. (I could do
>>>>>> this from Go, Java, or Python). If I want to run this locally (e.g. on the
>>>>>> Python FnAPI runner), I would prefer that the python bits be run in-process
>>>>>> but would have to shell out (maybe via docker, maybe something cheaper) for
>>>>>> the java bits. On the other hand, if I want to run this same pipeline
>>>>>> (ideally, the same model proto, such that we don't have
>>>>>> runner-dependent construction) on Flink, I might want the java bits to be
>>>>>> inlined and the Python bits to be in a separate process. On Dataflow, both
>>>>>> would live in containers. To do this, the Python runner would say "hey, I
>>>>>> know that Python environment" and just swap it out for in-process, and vice
>>>>>> versa. (For isolation/other reasons, one may want the option to force
>>>>>> everything to be docker, but that's more of a "don't make substitutions"
>>>>>> option than manually providing environment configs.)
>>>>>>
>>>>>> On the other hand, as we go the route of custom containers,
>>>>>> especially expansion services that might vend custom containers, I think we
>>>>>> need a way to push down *properties* of environments (such as resource
>>>>>> hints) through the expansion service that may influence the environments
>>>>>> that get attached and returned.
>>>>>>
>>>>>> It would be helpful for me to have concrete usecases of why a user
>>>>>> wants to customize the container used by some transform they did not write,
>>>>>> which could possibly inform the best course(s) of action here.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Beamers,
>>>>>>>>>
>>>>>>>>> Recently we’ve had some requests on user@ and Slack for
>>>>>>>>> instructions on how to use custom-built containers in cross-language
>>>>>>>>> pipelines (typically calling Java transforms from a predominantly Python
>>>>>>>>> pipeline). Currently, it seems like there is no way to change the container
>>>>>>>>> used by a cross-language transform except by modifying and rebuilding the
>>>>>>>>> expansion service. The SDK does not pass pipeline options to the expansion
>>>>>>>>> service (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything,
>>>>>>>>> however. Even if pipeline options are passed, the existing set of pipeline
>>>>>>>>> options still limits the amount of control we have over environments. Here
>>>>>>>>> are the existing pipeline options that I’m aware of:
>>>>>>>>>
>>>>>>>>> Python [2] and Go [3] have these:
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>>>>>    string that means different things depending on environment_type. For
>>>>>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>>>>>    EXTERNAL, it is the external service address.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>>>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>>>>>>> same as the above.
>>>>>>>>>
>>>>>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>>>>>> First, having a single overloaded option that can mean entirely different
>>>>>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>>>>>> this string further to introduce new environment-specific options, such as
>>>>>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>>>>>
>>>>>>>>
>>>>>>>> Agree.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> To address these problems, I added a new option called
>>>>>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>>>>>> propose a lot easier.
>>>>>>>>>
>>>>>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>>>>>> to set environment_config and default_environment_config to separate
>>>>>>>>> values, for Python and Java containers, respectively. However, this still
>>>>>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>>>>>> container image for all Java transforms. As more cross-language transforms
>>>>>>>>> are implemented, sooner or later someone will want to have different Java
>>>>>>>>> SDK containers for different external transforms.
>>>>>>>>>
>>>>>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>>>>>> runner. It lets us basically perform a find/replace on container image
>>>>>>>>> strings. This is not significantly more flexible than having a single
>>>>>>>>> option per SDK, since the default container images for all external
>>>>>>>>> transforms in each SDK are expected to be the same.)
>>>>>>>>>
>>>>>>>>> Environments logically belong with transforms, and that’s how it
>>>>>>>>> works in the Runner API [9]. The problem now is that from the user’s
>>>>>>>>> perspective, the environment is bound to the expansion service. After
>>>>>>>>> addressing BEAM-9449, the problem will be that one or two environments at
>>>>>>>>> most are bound to the pipeline. Ideally, though, users should have fully
>>>>>>>>> granular control over environments at the transform level.
>>>>>>>>>
>>>>>>>>> All this context for a very simple proposal: we should have all
>>>>>>>>> ExternalTransform subclasses take optional environment_type and
>>>>>>>>> environment_options fields in their constructors. As with their
>>>>>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>>>>>> environment_options in the pipeline options passed to the expansion service
>>>>>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>>>>>> environment_options to the expansion service individually to avoid having
>>>>>>>>> to overwrite their original values, but their original values should be
>>>>>>>>> irrelevant to the expansion service anyway.)
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>
>>>>>>>> So, an external transform is uniquely identified by its URN. An
>>>>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>>>>> which may have sub-transforms that refer to different environments. I think
>>>>>>>> with the above proposal we'll lose this flexibility.
>>>>>>>> What we need is a way to override environments (or properties of
>>>>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>>>>> environments in the proto it will be reflected to all transforms that
>>>>>>>> utilize such environments.
>>>>>>>>
>>>>>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>>>>> a new property that works for all types of environments ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>>>>>
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>>>>>
>>>>>>>>> [3]
>>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>>>>>
>>>>>>>>> [4]
>>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>>>>>
>>>>>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>>>>>
>>>>>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>>>>>
>>>>>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>>>>>
>>>>>>>>> [8]
>>>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>>>>>
>>>>>>>>> [9]
>>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>>>>>
>>>>>>>>>

Re: Environment options for external transforms

Posted by Chamikara Jayalath <ch...@google.com>.
On Thu, Feb 4, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Feb 4, 2021 at 4:16 PM Kyle Weaver <kc...@google.com> wrote:
>
>> I do think it can be useful to specify a custom "top-level" environment.
>>> We should probably make it easy to use customized expansion services.
>>
>>
>> I'm fine with adding startup argument(s) in the expansion service for
>> configuring the "top-level" environment. Since which expansion service to
>> use is already configurable in external transforms, it solves the problem
>> just as well as my original proposal. And if a particular expansion service
>> wants to do something more complicated, it can have its own logic to handle
>> that.
>>
>
> That sounds like a good plan.
>

Sounds good to me as well. Thanks Kyle.


>
>
>>
>>
>>> Ah, that clarifies things. Would it be possible/preferable to pass the
>>> credentials as parameters to the transform itself?
>>
>>
>> Maybe. But it's generally useful to be able to stage files to SDK
>> containers, so it's something we should consider making into a general
>> feature, perhaps based on the artifact API.
>>
>
> +1
>
>
>>
>> On Thu, Feb 4, 2021 at 3:52 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>>  This gets into the distinction of customizing what kind of environment
>>>>> one wants to have (which could be generally applicable) vs. an absolute
>>>>> designation of a particular environment (e.g. a docker image).
>>>>
>>>>
>>>> For common environment modifications, resource hints are a great idea,
>>>> since it's much easier to set an annotation than to build and set a custom
>>>> container. The limitation of this approach is we can't handle every
>>>> possible modification a user might want to make to their environment.
>>>> Custom containers give the user ultimate control over the environment, so
>>>> we forfeit a lot of flexibility if we don't provide enough options to use
>>>> them.
>>>>
>>>> Note that what we're running into in part is that "pipeline options"
>>>>> are the wrong level of granularity for specifying characteristics of an
>>>>> environment, as there is not a single environment to parameterize (or,
>>>>> possibly, even one per language).
>>>>
>>>>
>>>> Yes, this is the crux of the problem. We already expose an
>>>> environment_config as a pipeline option, so we basically have three choices:
>>>> 1. Deprecate pipeline-level environment options altogether.
>>>> 2. Find a way to generalize environment options.
>>>> 3. Keep and document the status quo (ie users can use custom
>>>> containers, but at most only one per language).
>>>>
>>>
>>> I do think it can be useful to specify a custom "top-level" environment.
>>> We should probably make it easy to use customized expansion services.
>>>
>>>
>>>> The caller should not need any visibility into the environment(s) that
>>>>> an expansion service uses, which is an implementation detail that the
>>>>> expansion service is free to change at any time. (In fact, whether it is
>>>>> (partially or fully) implemented as an external transform is an
>>>>> implementation detail that the end user should not need to care about or
>>>>> depend on.)
>>>>
>>>>
>>>> I personally think pattern matching and substitution by runners (maybe
>>>>> more sophisticated than regexp on container names) is a reasonable way to
>>>>> approach customization of environments.
>>>>
>>>>
>>>> Aren't these ideas contradictory? Pattern matching requires knowledge
>>>> in advance of which patterns to match. We'd need to know at least some
>>>> information about the environment the expansion service is expected to use
>>>> in order to replace it.
>>>>
>>>
>>> The pattern matching is not such that I want to replace the environment
>>> for this particular transform, but that /if/ I see a Java environment of a
>>> certain type /then/ I want to run it in this way.
>>>
>>>
>>>> For example, suppose I construct a pipeline that uses both Python and
>>>>> Java transforms. (I could do this from Go, Java, or Python). If I want to
>>>>> run this locally (e.g. on the Python FnAPI runner), I would prefer that the
>>>>> python bits be run in-process but would have to shell out (maybe via
>>>>> docker, maybe something cheaper) for the java bits. On the other hand, if I
>>>>> want to run this same pipeline (ideally, the same model proto, such that we
>>>>> don't have runner-dependent construction) on Flink, I might want the java
>>>>> bits to be inlined and the Python bits to be in a separate process. On
>>>>> Dataflow, both would live in containers. To do this, the Python runner
>>>>> would say "hey, I know that Python environment" and just swap it out for
>>>>> in-process, and vice versa. (For isolation/other reasons, one may want the
>>>>> option to force everything to be docker, but that's more of a "don't make
>>>>> substitutions" option than manually providing environment configs.)
>>>>
>>>>
>>>> In this example, wouldn't you normally just rebuild the pipeline? I'm
>>>> not sure what the advantage of re-using the same model proto is.
>>>>
>>>
>>> Yes, you'd re-build the pipeline. But if all you change is the --runner
>>> flag the model proto produced should not change. (And, sometimes, you may
>>> want to stash the proto itself, or pass it to one-of-N runners depending on
>>> some other condition, etc.)
>>>
>>>
>>>>  It would be helpful for me to have concrete usecases of why a user
>>>>> wants to customize the container used by some transform they did not write,
>>>>> which could possibly inform the best course(s) of action here.
>>>>
>>>>
>>>> I should have led with this. Someone wanted to mount credentials into
>>>> the SDK harness [1]. So in this particular case the user just wants to
>>>> mount files into their SDK harness, which is a pretty common use case, so
>>>> resource hints are probably a more appropriate solution.
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>>>>
>>>
>>> Ah, that clarifies things. Would it be possible/preferable to pass the
>>> credentials as parameters to the transform itself?
>>>
>>
This is very useful for testing as well. For example, to test containers
generated for release candidates.


>
>>>
>>>>
>>>>
>>>> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> So, an external transform is uniquely identified by its URN. An
>>>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>>>> which may have sub-transforms that refer to different environments. I think
>>>>>>> with the above proposal we'll lose this flexibility.
>>>>>>> What we need is a way to override environments (or properties of
>>>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>>>> environments in the proto it will be reflected to all transforms that
>>>>>>> utilize such environments.
>>>>>>
>>>>>>
>>>>>> As far as I can tell we currently only register a single environment
>>>>>> for the entire transform (and it's always the default). Am I missing
>>>>>> something?
>>>>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>>>>>
>>>>>> Anyway, I don't see how sub-transforms require overrides. We should
>>>>>> be able to propagate environment options to sub-transforms to achieve the
>>>>>> same purpose.
>>>>>>
>>>>>
>>>>> The discussion of resource hints at
>>>>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
>>>>> actually may tie into this as well. I would assume a localised request for,
>>>>> say, high memory should be propagated down to cross-language pipelines. It
>>>>> is possible that other customizations (such as making sure specific
>>>>> dependencies are available, or filesystems mounted) would fit here too.
>>>>>
>>>>> This gets into the distinction of customizing what kind of environment
>>>>> one wants to have (which could be generally applicable) vs. an absolute
>>>>> designation of a particular environment (e.g. a docker image).
>>>>>
>>>>> Note that what we're running into in part is that "pipeline options"
>>>>> are the wrong level of granularity for specifying characteristics of an
>>>>> environment, as there is not a single environment to parameterize (or,
>>>>> possibly, even one per language). If I call
>>>>> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
>>>>> and MyFancyTransform is composed of two environments, to which
>>>>> does docker_path apply? What about PTransforms that use ExternalTransforms
>>>>> under the hood (e.g does some pre-processing and then calls SQL, or calls
>>>>> Kafka followed by some Python-level post-processing)?
>>>>>
>>>>>
>>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>>>> a new property that works for all types of environments ?
>>>>>>
>>>>>>
>>>>>> In my original email, I wrote that
>>>>>> sdk_harness_container_image_overrides is no more flexible than having a
>>>>>> single option per SDK, since the default container images for all external
>>>>>> transforms in each SDK are expected to be the same. For example, in the
>>>>>> case of a pipeline with two external transforms that both use the same
>>>>>> default container image, sdk_harness_container_image_overrides does not let
>>>>>> the user give those two transforms different containers.
>>>>>>
>>>>>> From a design standpoint, I feel find-replace is hacky and backwards.
>>>>>> It's cleaner to specify what kind of environment we want directly in
>>>>>> the ExpansionRequest. That way all of the environment creation logic
>>>>>> belongs inside the expansion service.
>>>>>>
>>>>>
>>>>> While Environments logically belong with Transforms, it is the
>>>>> expansion service's job to attach the right environments to the transforms
>>>>> that it vends. The caller should not need any visibility into the
>>>>> environment(s) that an expansion service uses, which is an implementation
>>>>> detail that the expansion service is free to change at any time. (In fact,
>>>>> whether it is (partially or fully) implemented as an external transform is
>>>>> an implementation detail that the end user should not need to care about or
>>>>> depend on.)
>>>>>
>>>>> I personally think pattern matching and substitution by runners (maybe
>>>>> more sophisticated than regexp on container names) is a reasonable way to
>>>>> approach customization of environments. For example, suppose I construct a
>>>>> pipeline that uses both Python and Java transforms. (I could do this from
>>>>> Go, Java, or Python). If I want to run this locally (e.g. on the Python
>>>>> FnAPI runner), I would prefer that the python bits be run in-process but
>>>>> would have to shell out (maybe via docker, maybe something cheaper) for the
>>>>> java bits. On the other hand, if I want to run this same pipeline (ideally,
>>>>> the same model proto, such that we don't have
>>>>> runner-dependent construction) on Flink, I might want the java bits to be
>>>>> inlined and the Python bits to be in a separate process. On Dataflow, both
>>>>> would live in containers. To do this, the Python runner would say "hey, I
>>>>> know that Python environment" and just swap it out for in-process, and vice
>>>>> versa. (For isolation/other reasons, one may want the option to force
>>>>> everything to be docker, but that's more of a "don't make substitutions"
>>>>> option than manually providing environment configs.)
>>>>>
>>>>> On the other hand, as we go the route of custom containers, especially
>>>>> expansion services that might vend custom containers, I think we need a way
>>>>> to push down *properties* of environments (such as resource hints) through
>>>>> the expansion service that may influence the environments that get attached
>>>>> and returned.
>>>>>
>>>>> It would be helpful for me to have concrete usecases of why a user
>>>>> wants to customize the container used by some transform they did not write,
>>>>> which could possibly inform the best course(s) of action here.
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Beamers,
>>>>>>>>
>>>>>>>> Recently we’ve had some requests on user@ and Slack for
>>>>>>>> instructions on how to use custom-built containers in cross-language
>>>>>>>> pipelines (typically calling Java transforms from a predominantly Python
>>>>>>>> pipeline). Currently, it seems like there is no way to change the container
>>>>>>>> used by a cross-language transform except by modifying and rebuilding the
>>>>>>>> expansion service. The SDK does not pass pipeline options to the expansion
>>>>>>>> service (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything,
>>>>>>>> however. Even if pipeline options are passed, the existing set of pipeline
>>>>>>>> options still limits the amount of control we have over environments. Here
>>>>>>>> are the existing pipeline options that I’m aware of:
>>>>>>>>
>>>>>>>> Python [2] and Go [3] have these:
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>>>>    string that means different things depending on environment_type. For
>>>>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>>>>    EXTERNAL, it is the external service address.)
>>>>>>>>
>>>>>>>>
>>>>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>>>>>> same as the above.
>>>>>>>>
>>>>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>>>>> First, having a single overloaded option that can mean entirely different
>>>>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>>>>> this string further to introduce new environment-specific options, such as
>>>>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>>>>
>>>>>>>
>>>>>>> Agree.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> To address these problems, I added a new option called
>>>>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>>>>> propose a lot easier.
>>>>>>>>
>>>>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>>>>> to set environment_config and default_environment_config to separate
>>>>>>>> values, for Python and Java containers, respectively. However, this still
>>>>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>>>>> container image for all Java transforms. As more cross-language transforms
>>>>>>>> are implemented, sooner or later someone will want to have different Java
>>>>>>>> SDK containers for different external transforms.
>>>>>>>>
>>>>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>>>>> runner. It lets us basically perform a find/replace on container image
>>>>>>>> strings. This is not significantly more flexible than having a single
>>>>>>>> option per SDK, since the default container images for all external
>>>>>>>> transforms in each SDK are expected to be the same.)
>>>>>>>>
>>>>>>>> Environments logically belong with transforms, and that’s how it
>>>>>>>> works in the Runner API [9]. The problem now is that from the user’s
>>>>>>>> perspective, the environment is bound to the expansion service. After
>>>>>>>> addressing BEAM-9449, the problem will be that one or two environments at
>>>>>>>> most are bound to the pipeline. Ideally, though, users should have fully
>>>>>>>> granular control over environments at the transform level.
>>>>>>>>
>>>>>>>> All this context for a very simple proposal: we should have all
>>>>>>>> ExternalTransform subclasses take optional environment_type and
>>>>>>>> environment_options fields in their constructors. As with their
>>>>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>>>>> environment_options in the pipeline options passed to the expansion service
>>>>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>>>>> environment_options to the expansion service individually to avoid having
>>>>>>>> to overwrite their original values, but their original values should be
>>>>>>>> irrelevant to the expansion service anyway.)
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>
>>>>>>> So, an external transform is uniquely identified by its URN. An
>>>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>>>> which may have sub-transforms that refer to different environments. I think
>>>>>>> with the above proposal we'll lose this flexibility.
>>>>>>> What we need is a way to override environments (or properties of
>>>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>>>> environments in the proto it will be reflected to all transforms that
>>>>>>> utilize such environments.
>>>>>>>
>>>>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>>>> a new property that works for all types of environments ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>>>>
>>>>>>>> [2]
>>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>>>>
>>>>>>>> [3]
>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>>>>
>>>>>>>> [4]
>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>>>>
>>>>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>>>>
>>>>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>>>>
>>>>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>>>>
>>>>>>>> [8]
>>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>>>>
>>>>>>>> [9]
>>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>>>>
>>>>>>>>

Re: Environment options for external transforms

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Feb 4, 2021 at 4:16 PM Kyle Weaver <kc...@google.com> wrote:

> I do think it can be useful to specify a custom "top-level" environment.
>> We should probably make it easy to use customized expansion services.
>
>
> I'm fine with adding startup argument(s) in the expansion service for
> configuring the "top-level" environment. Since which expansion service to
> use is already configurable in external transforms, it solves the problem
> just as well as my original proposal. And if a particular expansion service
> wants to do something more complicated, it can have its own logic to handle
> that.
>

That sounds like a good plan.


>
>
>> Ah, that clarifies things. Would it be possible/preferable to pass the
>> credentials as parameters to the transform itself?
>
>
> Maybe. But it's generally useful to be able to stage files to SDK
> containers, so it's something we should consider making into a general
> feature, perhaps based on the artifact API.
>

+1


>
> On Thu, Feb 4, 2021 at 3:52 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>>  This gets into the distinction of customizing what kind of environment
>>>> one wants to have (which could be generally applicable) vs. an absolute
>>>> designation of a particular environment (e.g. a docker image).
>>>
>>>
>>> For common environment modifications, resource hints are a great idea,
>>> since it's much easier to set an annotation than to build and set a custom
>>> container. The limitation of this approach is we can't handle every
>>> possible modification a user might want to make to their environment.
>>> Custom containers give the user ultimate control over the environment, so
>>> we forfeit a lot of flexibility if we don't provide enough options to use
>>> them.
>>>
>>> Note that what we're running into in part is that "pipeline options" are
>>>> the wrong level of granularity for specifying characteristics of an
>>>> environment, as there is not a single environment to parameterize (or,
>>>> possibly, even one per language).
>>>
>>>
>>> Yes, this is the crux of the problem. We already expose an
>>> environment_config as a pipeline option, so we basically have three choices:
>>> 1. Deprecate pipeline-level environment options altogether.
>>> 2. Find a way to generalize environment options.
>>> 3. Keep and document the status quo (ie users can use custom containers,
>>> but at most only one per language).
>>>
>>
>> I do think it can be useful to specify a custom "top-level" environment.
>> We should probably make it easy to use customized expansion services.
>>
>>
>>> The caller should not need any visibility into the environment(s) that
>>>> an expansion service uses, which is an implementation detail that the
>>>> expansion service is free to change at any time. (In fact, whether it is
>>>> (partially or fully) implemented as an external transform is an
>>>> implementation detail that the end user should not need to care about or
>>>> depend on.)
>>>
>>>
>>> I personally think pattern matching and substitution by runners (maybe
>>>> more sophisticated than regexp on container names) is a reasonable way to
>>>> approach customization of environments.
>>>
>>>
>>> Aren't these ideas contradictory? Pattern matching requires knowledge in
>>> advance of which patterns to match. We'd need to know at least some
>>> information about the environment the expansion service is expected to use
>>> in order to replace it.
>>>
>>
>> The pattern matching is not such that I want to replace the environment
>> for this particular transform, but that /if/ I see a Java environment of a
>> certain type /then/ I want to run it in this way.
>>
>>
>>> For example, suppose I construct a pipeline that uses both Python and
>>>> Java transforms. (I could do this from Go, Java, or Python). If I want to
>>>> run this locally (e.g. on the Python FnAPI runner), I would prefer that the
>>>> python bits be run in-process but would have to shell out (maybe via
>>>> docker, maybe something cheaper) for the java bits. On the other hand, if I
>>>> want to run this same pipeline (ideally, the same model proto, such that we
>>>> don't have runner-dependent construction) on Flink, I might want the java
>>>> bits to be inlined and the Python bits to be in a separate process. On
>>>> Dataflow, both would live in containers. To do this, the Python runner
>>>> would say "hey, I know that Python environment" and just swap it out for
>>>> in-process, and vice versa. (For isolation/other reasons, one may want the
>>>> option to force everything to be docker, but that's more of a "don't make
>>>> substitutions" option than manually providing environment configs.)
>>>
>>>
>>> In this example, wouldn't you normally just rebuild the pipeline? I'm
>>> not sure what the advantage of re-using the same model proto is.
>>>
>>
>> Yes, you'd re-build the pipeline. But if all you change is the --runner
>> flag the model proto produced should not change. (And, sometimes, you may
>> want to stash the proto itself, or pass it to one-of-N runners depending on
>> some other condition, etc.)
>>
>>
>>>  It would be helpful for me to have concrete usecases of why a user
>>>> wants to customize the container used by some transform they did not write,
>>>> which could possibly inform the best course(s) of action here.
>>>
>>>
>>> I should have led with this. Someone wanted to mount credentials into
>>> the SDK harness [1]. So in this particular case the user just wants to
>>> mount files into their SDK harness, which is a pretty common use case, so
>>> resource hints are probably a more appropriate solution.
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>>>
>>
>> Ah, that clarifies things. Would it be possible/preferable to pass the
>> credentials as parameters to the transform itself?
>>
>>
>>>
>>>
>>> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> So, an external transform is uniquely identified by its URN. An
>>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>>> which may have sub-transforms that refer to different environments. I think
>>>>>> with the above proposal we'll lose this flexibility.
>>>>>> What we need is a way to override environments (or properties of
>>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>>> environments in the proto it will be reflected to all transforms that
>>>>>> utilize such environments.
>>>>>
>>>>>
>>>>> As far as I can tell we currently only register a single environment
>>>>> for the entire transform (and it's always the default). Am I missing
>>>>> something?
>>>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>>>>
>>>>> Anyway, I don't see how sub-transforms require overrides. We should be
>>>>> able to propagate environment options to sub-transforms to achieve the same
>>>>> purpose.
>>>>>
>>>>
>>>> The discussion of resource hints at
>>>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
>>>> actually may tie into this as well. I would assume a localised request for,
>>>> say, high memory should be propagated down to cross-language pipelines. It
>>>> is possible that other customizations (such as making sure specific
>>>> dependencies are available, or filesystems mounted) would fit here too.
>>>>
>>>> This gets into the distinction of customizing what kind of environment
>>>> one wants to have (which could be generally applicable) vs. an absolute
>>>> designation of a particular environment (e.g. a docker image).
>>>>
>>>> Note that what we're running into in part is that "pipeline options"
>>>> are the wrong level of granularity for specifying characteristics of an
>>>> environment, as there is not a single environment to parameterize (or,
>>>> possibly, even one per language). If I call
>>>> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
>>>> and MyFancyTransform is composed of two environments, to which
>>>> does docker_path apply? What about PTransforms that use ExternalTransforms
>>>> under the hood (e.g does some pre-processing and then calls SQL, or calls
>>>> Kafka followed by some Python-level post-processing)?
>>>>
>>>>
>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>>> a new property that works for all types of environments ?
>>>>>
>>>>>
>>>>> In my original email, I wrote that
>>>>> sdk_harness_container_image_overrides is no more flexible than having a
>>>>> single option per SDK, since the default container images for all external
>>>>> transforms in each SDK are expected to be the same. For example, in the
>>>>> case of a pipeline with two external transforms that both use the same
>>>>> default container image, sdk_harness_container_image_overrides does not let
>>>>> the user give those two transforms different containers.
>>>>>
>>>>> From a design standpoint, I feel find-replace is hacky and backwards.
>>>>> It's cleaner to specify what kind of environment we want directly in
>>>>> the ExpansionRequest. That way all of the environment creation logic
>>>>> belongs inside the expansion service.
>>>>>
>>>>
>>>> While Environments logically belong with Transforms, it is the
>>>> expansion service's job to attach the right environments to the transforms
>>>> that it vends. The caller should not need any visibility into the
>>>> environment(s) that an expansion service uses, which is an implementation
>>>> detail that the expansion service is free to change at any time. (In fact,
>>>> whether it is (partially or fully) implemented as an external transform is
>>>> an implementation detail that the end user should not need to care about or
>>>> depend on.)
>>>>
>>>> I personally think pattern matching and substitution by runners (maybe
>>>> more sophisticated than regexp on container names) is a reasonable way to
>>>> approach customization of environments. For example, suppose I construct a
>>>> pipeline that uses both Python and Java transforms. (I could do this from
>>>> Go, Java, or Python). If I want to run this locally (e.g. on the Python
>>>> FnAPI runner), I would prefer that the python bits be run in-process but
>>>> would have to shell out (maybe via docker, maybe something cheaper) for the
>>>> java bits. On the other hand, if I want to run this same pipeline (ideally,
>>>> the same model proto, such that we don't have
>>>> runner-dependent construction) on Flink, I might want the java bits to be
>>>> inlined and the Python bits to be in a separate process. On Dataflow, both
>>>> would live in containers. To do this, the Python runner would say "hey, I
>>>> know that Python environment" and just swap it out for in-process, and vice
>>>> versa. (For isolation/other reasons, one may want the option to force
>>>> everything to be docker, but that's more of a "don't make substitutions"
>>>> option than manually providing environment configs.)
>>>>
>>>> On the other hand, as we go the route of custom containers, especially
>>>> expansion services that might vend custom containers, I think we need a way
>>>> to push down *properties* of environments (such as resource hints) through
>>>> the expansion service that may influence the environments that get attached
>>>> and returned.
>>>>
>>>> It would be helpful for me to have concrete usecases of why a user
>>>> wants to customize the container used by some transform they did not write,
>>>> which could possibly inform the best course(s) of action here.
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Beamers,
>>>>>>>
>>>>>>> Recently we’ve had some requests on user@ and Slack for
>>>>>>> instructions on how to use custom-built containers in cross-language
>>>>>>> pipelines (typically calling Java transforms from a predominantly Python
>>>>>>> pipeline). Currently, it seems like there is no way to change the container
>>>>>>> used by a cross-language transform except by modifying and rebuilding the
>>>>>>> expansion service. The SDK does not pass pipeline options to the expansion
>>>>>>> service (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything,
>>>>>>> however. Even if pipeline options are passed, the existing set of pipeline
>>>>>>> options still limits the amount of control we have over environments. Here
>>>>>>> are the existing pipeline options that I’m aware of:
>>>>>>>
>>>>>>> Python [2] and Go [3] have these:
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>>>    -
>>>>>>>
>>>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>>>    string that means different things depending on environment_type. For
>>>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>>>    EXTERNAL, it is the external service address.)
>>>>>>>
>>>>>>>
>>>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>>>>> same as the above.
>>>>>>>
>>>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>>>> First, having a single overloaded option that can mean entirely different
>>>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>>>> this string further to introduce new environment-specific options, such as
>>>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>>>
>>>>>>
>>>>>> Agree.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> To address these problems, I added a new option called
>>>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>>>> propose a lot easier.
>>>>>>>
>>>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>>>> to set environment_config and default_environment_config to separate
>>>>>>> values, for Python and Java containers, respectively. However, this still
>>>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>>>> container image for all Java transforms. As more cross-language transforms
>>>>>>> are implemented, sooner or later someone will want to have different Java
>>>>>>> SDK containers for different external transforms.
>>>>>>>
>>>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>>>> runner. It lets us basically perform a find/replace on container image
>>>>>>> strings. This is not significantly more flexible than having a single
>>>>>>> option per SDK, since the default container images for all external
>>>>>>> transforms in each SDK are expected to be the same.)
>>>>>>>
>>>>>>> Environments logically belong with transforms, and that’s how it
>>>>>>> works in the Runner API [9]. The problem now is that from the user’s
>>>>>>> perspective, the environment is bound to the expansion service. After
>>>>>>> addressing BEAM-9449, the problem will be that one or two environments at
>>>>>>> most are bound to the pipeline. Ideally, though, users should have fully
>>>>>>> granular control over environments at the transform level.
>>>>>>>
>>>>>>> All this context for a very simple proposal: we should have all
>>>>>>> ExternalTransform subclasses take optional environment_type and
>>>>>>> environment_options fields in their constructors. As with their
>>>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>>>> environment_options in the pipeline options passed to the expansion service
>>>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>>>> environment_options to the expansion service individually to avoid having
>>>>>>> to overwrite their original values, but their original values should be
>>>>>>> irrelevant to the expansion service anyway.)
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>
>>>>>> So, an external transform is uniquely identified by its URN. An
>>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>>> which may have sub-transforms that refer to different environments. I think
>>>>>> with the above proposal we'll lose this flexibility.
>>>>>> What we need is a way to override environments (or properties of
>>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>>> environments in the proto it will be reflected to all transforms that
>>>>>> utilize such environments.
>>>>>>
>>>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>>> a new property that works for all types of environments ?
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>>>
>>>>>>> [2]
>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>>>
>>>>>>> [3]
>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>>>
>>>>>>> [4]
>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>>>
>>>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>>>
>>>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>>>
>>>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>>>
>>>>>>> [8]
>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>>>
>>>>>>> [9]
>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>>>
>>>>>>>

Re: Environment options for external transforms

Posted by Kyle Weaver <kc...@google.com>.
>
> I do think it can be useful to specify a custom "top-level" environment.
> We should probably make it easy to use customized expansion services.


I'm fine with adding startup argument(s) in the expansion service for
configuring the "top-level" environment. Since which expansion service to
use is already configurable in external transforms, it solves the problem
just as well as my original proposal. And if a particular expansion service
wants to do something more complicated, it can have its own logic to handle
that.


> Ah, that clarifies things. Would it be possible/preferable to pass the
> credentials as parameters to the transform itself?


Maybe. But it's generally useful to be able to stage files to SDK
containers, so it's something we should consider making into a general
feature, perhaps based on the artifact API.

On Thu, Feb 4, 2021 at 3:52 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <kc...@google.com> wrote:
>
>>  This gets into the distinction of customizing what kind of environment
>>> one wants to have (which could be generally applicable) vs. an absolute
>>> designation of a particular environment (e.g. a docker image).
>>
>>
>> For common environment modifications, resource hints are a great idea,
>> since it's much easier to set an annotation than to build and set a custom
>> container. The limitation of this approach is we can't handle every
>> possible modification a user might want to make to their environment.
>> Custom containers give the user ultimate control over the environment, so
>> we forfeit a lot of flexibility if we don't provide enough options to use
>> them.
>>
>> Note that what we're running into in part is that "pipeline options" are
>>> the wrong level of granularity for specifying characteristics of an
>>> environment, as there is not a single environment to parameterize (or,
>>> possibly, even one per language).
>>
>>
>> Yes, this is the crux of the problem. We already expose an
>> environment_config as a pipeline option, so we basically have three choices:
>> 1. Deprecate pipeline-level environment options altogether.
>> 2. Find a way to generalize environment options.
>> 3. Keep and document the status quo (ie users can use custom containers,
>> but at most only one per language).
>>
>
> I do think it can be useful to specify a custom "top-level" environment.
> We should probably make it easy to use customized expansion services.
>
>
>> The caller should not need any visibility into the environment(s) that an
>>> expansion service uses, which is an implementation detail that the
>>> expansion service is free to change at any time. (In fact, whether it is
>>> (partially or fully) implemented as an external transform is an
>>> implementation detail that the end user should not need to care about or
>>> depend on.)
>>
>>
>> I personally think pattern matching and substitution by runners (maybe
>>> more sophisticated than regexp on container names) is a reasonable way to
>>> approach customization of environments.
>>
>>
>> Aren't these ideas contradictory? Pattern matching requires knowledge in
>> advance of which patterns to match. We'd need to know at least some
>> information about the environment the expansion service is expected to use
>> in order to replace it.
>>
>
> The pattern matching is not such that I want to replace the environment
> for this particular transform, but that /if/ I see a Java environment of a
> certain type /then/ I want to run it in this way.
>
>
>> For example, suppose I construct a pipeline that uses both Python and
>>> Java transforms. (I could do this from Go, Java, or Python). If I want to
>>> run this locally (e.g. on the Python FnAPI runner), I would prefer that the
>>> python bits be run in-process but would have to shell out (maybe via
>>> docker, maybe something cheaper) for the java bits. On the other hand, if I
>>> want to run this same pipeline (ideally, the same model proto, such that we
>>> don't have runner-dependent construction) on Flink, I might want the java
>>> bits to be inlined and the Python bits to be in a separate process. On
>>> Dataflow, both would live in containers. To do this, the Python runner
>>> would say "hey, I know that Python environment" and just swap it out for
>>> in-process, and vice versa. (For isolation/other reasons, one may want the
>>> option to force everything to be docker, but that's more of a "don't make
>>> substitutions" option than manually providing environment configs.)
>>
>>
>> In this example, wouldn't you normally just rebuild the pipeline? I'm not
>> sure what the advantage of re-using the same model proto is.
>>
>
> Yes, you'd re-build the pipeline. But if all you change is the --runner
> flag the model proto produced should not change. (And, sometimes, you may
> want to stash the proto itself, or pass it to one-of-N runners depending on
> some other condition, etc.)
>
>
>>  It would be helpful for me to have concrete usecases of why a user wants
>>> to customize the container used by some transform they did not write, which
>>> could possibly inform the best course(s) of action here.
>>
>>
>> I should have led with this. Someone wanted to mount credentials into the
>> SDK harness [1]. So in this particular case the user just wants to mount
>> files into their SDK harness, which is a pretty common use case, so
>> resource hints are probably a more appropriate solution.
>>
>> [1]
>> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>>
>
> Ah, that clarifies things. Would it be possible/preferable to pass the
> credentials as parameters to the transform itself?
>
>
>>
>>
>> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> So, an external transform is uniquely identified by its URN. An
>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>> which may have sub-transforms that refer to different environments. I think
>>>>> with the above proposal we'll lose this flexibility.
>>>>> What we need is a way to override environments (or properties of
>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>> environments in the proto it will be reflected to all transforms that
>>>>> utilize such environments.
>>>>
>>>>
>>>> As far as I can tell we currently only register a single environment
>>>> for the entire transform (and it's always the default). Am I missing
>>>> something?
>>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>>>
>>>> Anyway, I don't see how sub-transforms require overrides. We should be
>>>> able to propagate environment options to sub-transforms to achieve the same
>>>> purpose.
>>>>
>>>
>>> The discussion of resource hints at
>>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
>>> actually may tie into this as well. I would assume a localised request for,
>>> say, high memory should be propagated down to cross-language pipelines. It
>>> is possible that other customizations (such as making sure specific
>>> dependencies are available, or filesystems mounted) would fit here too.
>>>
>>> This gets into the distinction of customizing what kind of environment
>>> one wants to have (which could be generally applicable) vs. an absolute
>>> designation of a particular environment (e.g. a docker image).
>>>
>>> Note that what we're running into in part is that "pipeline options" are
>>> the wrong level of granularity for specifying characteristics of an
>>> environment, as there is not a single environment to parameterize (or,
>>> possibly, even one per language). If I call
>>> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
>>> and MyFancyTransform is composed of two environments, to which
>>> does docker_path apply? What about PTransforms that use ExternalTransforms
>>> under the hood (e.g does some pre-processing and then calls SQL, or calls
>>> Kafka followed by some Python-level post-processing)?
>>>
>>>
>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>> a new property that works for all types of environments ?
>>>>
>>>>
>>>> In my original email, I wrote that
>>>> sdk_harness_container_image_overrides is no more flexible than having a
>>>> single option per SDK, since the default container images for all external
>>>> transforms in each SDK are expected to be the same. For example, in the
>>>> case of a pipeline with two external transforms that both use the same
>>>> default container image, sdk_harness_container_image_overrides does not let
>>>> the user give those two transforms different containers.
>>>>
>>>> From a design standpoint, I feel find-replace is hacky and backwards.
>>>> It's cleaner to specify what kind of environment we want directly in
>>>> the ExpansionRequest. That way all of the environment creation logic
>>>> belongs inside the expansion service.
>>>>
>>>
>>> While Environments logically belong with Transforms, it is the expansion
>>> service's job to attach the right environments to the transforms that it
>>> vends. The caller should not need any visibility into the environment(s)
>>> that an expansion service uses, which is an implementation detail that the
>>> expansion service is free to change at any time. (In fact, whether it is
>>> (partially or fully) implemented as an external transform is an
>>> implementation detail that the end user should not need to care about or
>>> depend on.)
>>>
>>> I personally think pattern matching and substitution by runners (maybe
>>> more sophisticated than regexp on container names) is a reasonable way to
>>> approach customization of environments. For example, suppose I construct a
>>> pipeline that uses both Python and Java transforms. (I could do this from
>>> Go, Java, or Python). If I want to run this locally (e.g. on the Python
>>> FnAPI runner), I would prefer that the python bits be run in-process but
>>> would have to shell out (maybe via docker, maybe something cheaper) for the
>>> java bits. On the other hand, if I want to run this same pipeline (ideally,
>>> the same model proto, such that we don't have
>>> runner-dependent construction) on Flink, I might want the java bits to be
>>> inlined and the Python bits to be in a separate process. On Dataflow, both
>>> would live in containers. To do this, the Python runner would say "hey, I
>>> know that Python environment" and just swap it out for in-process, and vice
>>> versa. (For isolation/other reasons, one may want the option to force
>>> everything to be docker, but that's more of a "don't make substitutions"
>>> option than manually providing environment configs.)
>>>
>>> On the other hand, as we go the route of custom containers, especially
>>> expansion services that might vend custom containers, I think we need a way
>>> to push down *properties* of environments (such as resource hints) through
>>> the expansion service that may influence the environments that get attached
>>> and returned.
>>>
>>> It would be helpful for me to have concrete usecases of why a user wants
>>> to customize the container used by some transform they did not write, which
>>> could possibly inform the best course(s) of action here.
>>>
>>>
>>>
>>>>
>>>>
>>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Beamers,
>>>>>>
>>>>>> Recently we’ve had some requests on user@ and Slack for instructions
>>>>>> on how to use custom-built containers in cross-language pipelines
>>>>>> (typically calling Java transforms from a predominantly Python pipeline).
>>>>>> Currently, it seems like there is no way to change the container used by a
>>>>>> cross-language transform except by modifying and rebuilding the expansion
>>>>>> service. The SDK does not pass pipeline options to the expansion service
>>>>>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>>>>>> if pipeline options are passed, the existing set of pipeline options still
>>>>>> limits the amount of control we have over environments. Here are the
>>>>>> existing pipeline options that I’m aware of:
>>>>>>
>>>>>> Python [2] and Go [3] have these:
>>>>>>
>>>>>>    -
>>>>>>
>>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>>    -
>>>>>>
>>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>>    string that means different things depending on environment_type. For
>>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>>    EXTERNAL, it is the external service address.)
>>>>>>
>>>>>>
>>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>>>> same as the above.
>>>>>>
>>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>>> First, having a single overloaded option that can mean entirely different
>>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>>> this string further to introduce new environment-specific options, such as
>>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>>
>>>>>
>>>>> Agree.
>>>>>
>>>>>
>>>>>>
>>>>>> To address these problems, I added a new option called
>>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>>> propose a lot easier.
>>>>>>
>>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>>> to set environment_config and default_environment_config to separate
>>>>>> values, for Python and Java containers, respectively. However, this still
>>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>>> container image for all Java transforms. As more cross-language transforms
>>>>>> are implemented, sooner or later someone will want to have different Java
>>>>>> SDK containers for different external transforms.
>>>>>>
>>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>>> runner. It lets us basically perform a find/replace on container image
>>>>>> strings. This is not significantly more flexible than having a single
>>>>>> option per SDK, since the default container images for all external
>>>>>> transforms in each SDK are expected to be the same.)
>>>>>>
>>>>>> Environments logically belong with transforms, and that’s how it
>>>>>> works in the Runner API [9]. The problem now is that from the user’s
>>>>>> perspective, the environment is bound to the expansion service. After
>>>>>> addressing BEAM-9449, the problem will be that one or two environments at
>>>>>> most are bound to the pipeline. Ideally, though, users should have fully
>>>>>> granular control over environments at the transform level.
>>>>>>
>>>>>> All this context for a very simple proposal: we should have all
>>>>>> ExternalTransform subclasses take optional environment_type and
>>>>>> environment_options fields in their constructors. As with their
>>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>>> environment_options in the pipeline options passed to the expansion service
>>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>>> environment_options to the expansion service individually to avoid having
>>>>>> to overwrite their original values, but their original values should be
>>>>>> irrelevant to the expansion service anyway.)
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>
>>>>> So, an external transform is uniquely identified by its URN. An
>>>>> external transform identified by a URN may refer to an arbitrary composite
>>>>> which may have sub-transforms that refer to different environments. I think
>>>>> with the above proposal we'll lose this flexibility.
>>>>> What we need is a way to override environments (or properties of
>>>>> environments) that results in the final pipeline proto. Once we modify such
>>>>> environments in the proto it will be reflected to all transforms that
>>>>> utilize such environments.
>>>>>
>>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>>> unfortunately only works for Dataflow today). Also this only works for
>>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>>> a new property that works for all types of environments ?
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>>
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>>
>>>>>> [3]
>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>>
>>>>>> [4]
>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>>
>>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>>
>>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>>
>>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>>
>>>>>> [8]
>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>>
>>>>>> [9]
>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>>
>>>>>>

Re: Environment options for external transforms

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <kc...@google.com> wrote:

>  This gets into the distinction of customizing what kind of environment
>> one wants to have (which could be generally applicable) vs. an absolute
>> designation of a particular environment (e.g. a docker image).
>
>
> For common environment modifications, resource hints are a great idea,
> since it's much easier to set an annotation than to build and set a custom
> container. The limitation of this approach is we can't handle every
> possible modification a user might want to make to their environment.
> Custom containers give the user ultimate control over the environment, so
> we forfeit a lot of flexibility if we don't provide enough options to use
> them.
>
> Note that what we're running into in part is that "pipeline options" are
>> the wrong level of granularity for specifying characteristics of an
>> environment, as there is not a single environment to parameterize (or,
>> possibly, even one per language).
>
>
> Yes, this is the crux of the problem. We already expose an
> environment_config as a pipeline option, so we basically have three choices:
> 1. Deprecate pipeline-level environment options altogether.
> 2. Find a way to generalize environment options.
> 3. Keep and document the status quo (ie users can use custom containers,
> but at most only one per language).
>

I do think it can be useful to specify a custom "top-level" environment. We
should probably make it easy to use customized expansion services.


> The caller should not need any visibility into the environment(s) that an
>> expansion service uses, which is an implementation detail that the
>> expansion service is free to change at any time. (In fact, whether it is
>> (partially or fully) implemented as an external transform is an
>> implementation detail that the end user should not need to care about or
>> depend on.)
>
>
> I personally think pattern matching and substitution by runners (maybe
>> more sophisticated than regexp on container names) is a reasonable way to
>> approach customization of environments.
>
>
> Aren't these ideas contradictory? Pattern matching requires knowledge in
> advance of which patterns to match. We'd need to know at least some
> information about the environment the expansion service is expected to use
> in order to replace it.
>

The pattern matching is not such that I want to replace the environment for
this particular transform, but that /if/ I see a Java environment of a
certain type /then/ I want to run it in this way.


> For example, suppose I construct a pipeline that uses both Python and Java
>> transforms. (I could do this from Go, Java, or Python). If I want to run
>> this locally (e.g. on the Python FnAPI runner), I would prefer that the
>> python bits be run in-process but would have to shell out (maybe via
>> docker, maybe something cheaper) for the java bits. On the other hand, if I
>> want to run this same pipeline (ideally, the same model proto, such that we
>> don't have runner-dependent construction) on Flink, I might want the java
>> bits to be inlined and the Python bits to be in a separate process. On
>> Dataflow, both would live in containers. To do this, the Python runner
>> would say "hey, I know that Python environment" and just swap it out for
>> in-process, and vice versa. (For isolation/other reasons, one may want the
>> option to force everything to be docker, but that's more of a "don't make
>> substitutions" option than manually providing environment configs.)
>
>
> In this example, wouldn't you normally just rebuild the pipeline? I'm not
> sure what the advantage of re-using the same model proto is.
>

Yes, you'd re-build the pipeline. But if all you change is the --runner
flag the model proto produced should not change. (And, sometimes, you may
want to stash the proto itself, or pass it to one-of-N runners depending on
some other condition, etc.)


>  It would be helpful for me to have concrete usecases of why a user wants
>> to customize the container used by some transform they did not write, which
>> could possibly inform the best course(s) of action here.
>
>
> I should have led with this. Someone wanted to mount credentials into the
> SDK harness [1]. So in this particular case the user just wants to mount
> files into their SDK harness, which is a pretty common use case, so
> resource hints are probably a more appropriate solution.
>
> [1]
> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>

Ah, that clarifies things. Would it be possible/preferable to pass the
credentials as parameters to the transform itself?


>
>
> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> So, an external transform is uniquely identified by its URN. An external
>>>> transform identified by a URN may refer to an arbitrary composite which may
>>>> have sub-transforms that refer to different environments. I think with the
>>>> above proposal we'll lose this flexibility.
>>>> What we need is a way to override environments (or properties of
>>>> environments) that results in the final pipeline proto. Once we modify such
>>>> environments in the proto it will be reflected to all transforms that
>>>> utilize such environments.
>>>
>>>
>>> As far as I can tell we currently only register a single environment for
>>> the entire transform (and it's always the default). Am I missing something?
>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>>
>>> Anyway, I don't see how sub-transforms require overrides. We should be
>>> able to propagate environment options to sub-transforms to achieve the same
>>> purpose.
>>>
>>
>> The discussion of resource hints at
>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
>> actually may tie into this as well. I would assume a localised request for,
>> say, high memory should be propagated down to cross-language pipelines. It
>> is possible that other customizations (such as making sure specific
>> dependencies are available, or filesystems mounted) would fit here too.
>>
>> This gets into the distinction of customizing what kind of environment
>> one wants to have (which could be generally applicable) vs. an absolute
>> designation of a particular environment (e.g. a docker image).
>>
>> Note that what we're running into in part is that "pipeline options" are
>> the wrong level of granularity for specifying characteristics of an
>> environment, as there is not a single environment to parameterize (or,
>> possibly, even one per language). If I call
>> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
>> and MyFancyTransform is composed of two environments, to which
>> does docker_path apply? What about PTransforms that use ExternalTransforms
>> under the hood (e.g does some pre-processing and then calls SQL, or calls
>> Kafka followed by some Python-level post-processing)?
>>
>>
>> 'sdk_harness_container_image_overrides' is such a property (which
>>>> unfortunately only works for Dataflow today). Also this only works for
>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>> a new property that works for all types of environments ?
>>>
>>>
>>> In my original email, I wrote that sdk_harness_container_image_overrides
>>> is no more flexible than having a single option per SDK, since the default
>>> container images for all external transforms in each SDK are expected to be
>>> the same. For example, in the case of a pipeline with two external
>>> transforms that both use the same default container image,
>>> sdk_harness_container_image_overrides does not let the user give those two
>>> transforms different containers.
>>>
>>> From a design standpoint, I feel find-replace is hacky and backwards.
>>> It's cleaner to specify what kind of environment we want directly in
>>> the ExpansionRequest. That way all of the environment creation logic
>>> belongs inside the expansion service.
>>>
>>
>> While Environments logically belong with Transforms, it is the expansion
>> service's job to attach the right environments to the transforms that it
>> vends. The caller should not need any visibility into the environment(s)
>> that an expansion service uses, which is an implementation detail that the
>> expansion service is free to change at any time. (In fact, whether it is
>> (partially or fully) implemented as an external transform is an
>> implementation detail that the end user should not need to care about or
>> depend on.)
>>
>> I personally think pattern matching and substitution by runners (maybe
>> more sophisticated than regexp on container names) is a reasonable way to
>> approach customization of environments. For example, suppose I construct a
>> pipeline that uses both Python and Java transforms. (I could do this from
>> Go, Java, or Python). If I want to run this locally (e.g. on the Python
>> FnAPI runner), I would prefer that the python bits be run in-process but
>> would have to shell out (maybe via docker, maybe something cheaper) for the
>> java bits. On the other hand, if I want to run this same pipeline (ideally,
>> the same model proto, such that we don't have
>> runner-dependent construction) on Flink, I might want the java bits to be
>> inlined and the Python bits to be in a separate process. On Dataflow, both
>> would live in containers. To do this, the Python runner would say "hey, I
>> know that Python environment" and just swap it out for in-process, and vice
>> versa. (For isolation/other reasons, one may want the option to force
>> everything to be docker, but that's more of a "don't make substitutions"
>> option than manually providing environment configs.)
>>
>> On the other hand, as we go the route of custom containers, especially
>> expansion services that might vend custom containers, I think we need a way
>> to push down *properties* of environments (such as resource hints) through
>> the expansion service that may influence the environments that get attached
>> and returned.
>>
>> It would be helpful for me to have concrete usecases of why a user wants
>> to customize the container used by some transform they did not write, which
>> could possibly inform the best course(s) of action here.
>>
>>
>>
>>>
>>>
>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Beamers,
>>>>>
>>>>> Recently we’ve had some requests on user@ and Slack for instructions
>>>>> on how to use custom-built containers in cross-language pipelines
>>>>> (typically calling Java transforms from a predominantly Python pipeline).
>>>>> Currently, it seems like there is no way to change the container used by a
>>>>> cross-language transform except by modifying and rebuilding the expansion
>>>>> service. The SDK does not pass pipeline options to the expansion service
>>>>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>>>>> if pipeline options are passed, the existing set of pipeline options still
>>>>> limits the amount of control we have over environments. Here are the
>>>>> existing pipeline options that I’m aware of:
>>>>>
>>>>> Python [2] and Go [3] have these:
>>>>>
>>>>>    -
>>>>>
>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>    -
>>>>>
>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>    string that means different things depending on environment_type. For
>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>    EXTERNAL, it is the external service address.)
>>>>>
>>>>>
>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>>> same as the above.
>>>>>
>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>> First, having a single overloaded option that can mean entirely different
>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>> this string further to introduce new environment-specific options, such as
>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>
>>>>
>>>> Agree.
>>>>
>>>>
>>>>>
>>>>> To address these problems, I added a new option called
>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>> propose a lot easier.
>>>>>
>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>> to set environment_config and default_environment_config to separate
>>>>> values, for Python and Java containers, respectively. However, this still
>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>> container image for all Java transforms. As more cross-language transforms
>>>>> are implemented, sooner or later someone will want to have different Java
>>>>> SDK containers for different external transforms.
>>>>>
>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>> runner. It lets us basically perform a find/replace on container image
>>>>> strings. This is not significantly more flexible than having a single
>>>>> option per SDK, since the default container images for all external
>>>>> transforms in each SDK are expected to be the same.)
>>>>>
>>>>> Environments logically belong with transforms, and that’s how it works
>>>>> in the Runner API [9]. The problem now is that from the user’s perspective,
>>>>> the environment is bound to the expansion service. After addressing
>>>>> BEAM-9449, the problem will be that one or two environments at most are
>>>>> bound to the pipeline. Ideally, though, users should have fully granular
>>>>> control over environments at the transform level.
>>>>>
>>>>> All this context for a very simple proposal: we should have all
>>>>> ExternalTransform subclasses take optional environment_type and
>>>>> environment_options fields in their constructors. As with their
>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>> environment_options in the pipeline options passed to the expansion service
>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>> environment_options to the expansion service individually to avoid having
>>>>> to overwrite their original values, but their original values should be
>>>>> irrelevant to the expansion service anyway.)
>>>>>
>>>>> What do you think?
>>>>>
>>>>
>>>> So, an external transform is uniquely identified by its URN. An
>>>> external transform identified by a URN may refer to an arbitrary composite
>>>> which may have sub-transforms that refer to different environments. I think
>>>> with the above proposal we'll lose this flexibility.
>>>> What we need is a way to override environments (or properties of
>>>> environments) that results in the final pipeline proto. Once we modify such
>>>> environments in the proto it will be reflected to all transforms that
>>>> utilize such environments.
>>>>
>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>> unfortunately only works for Dataflow today). Also this only works for
>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>> a new property that works for all types of environments ?
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>
>>>>> [3]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>
>>>>> [4]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>
>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>
>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>
>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>
>>>>> [8]
>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>
>>>>> [9]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>
>>>>>

Re: Environment options for external transforms

Posted by Kyle Weaver <kc...@google.com>.
>
>  This gets into the distinction of customizing what kind of environment
> one wants to have (which could be generally applicable) vs. an absolute
> designation of a particular environment (e.g. a docker image).


For common environment modifications, resource hints are a great idea,
since it's much easier to set an annotation than to build and set a custom
container. The limitation of this approach is we can't handle every
possible modification a user might want to make to their environment.
Custom containers give the user ultimate control over the environment, so
we forfeit a lot of flexibility if we don't provide enough options to use
them.

Note that what we're running into in part is that "pipeline options" are
> the wrong level of granularity for specifying characteristics of an
> environment, as there is not a single environment to parameterize (or,
> possibly, even one per language).


Yes, this is the crux of the problem. We already expose an
environment_config as a pipeline option, so we basically have three choices:
1. Deprecate pipeline-level environment options altogether.
2. Find a way to generalize environment options.
3. Keep and document the status quo (ie users can use custom containers,
but at most only one per language).

The caller should not need any visibility into the environment(s) that an
> expansion service uses, which is an implementation detail that the
> expansion service is free to change at any time. (In fact, whether it is
> (partially or fully) implemented as an external transform is an
> implementation detail that the end user should not need to care about or
> depend on.)


I personally think pattern matching and substitution by runners (maybe more
> sophisticated than regexp on container names) is a reasonable way to
> approach customization of environments.


Aren't these ideas contradictory? Pattern matching requires knowledge in
advance of which patterns to match. We'd need to know at least some
information about the environment the expansion service is expected to use
in order to replace it.

For example, suppose I construct a pipeline that uses both Python and Java
> transforms. (I could do this from Go, Java, or Python). If I want to run
> this locally (e.g. on the Python FnAPI runner), I would prefer that the
> python bits be run in-process but would have to shell out (maybe via
> docker, maybe something cheaper) for the java bits. On the other hand, if I
> want to run this same pipeline (ideally, the same model proto, such that we
> don't have runner-dependent construction) on Flink, I might want the java
> bits to be inlined and the Python bits to be in a separate process. On
> Dataflow, both would live in containers. To do this, the Python runner
> would say "hey, I know that Python environment" and just swap it out for
> in-process, and vice versa. (For isolation/other reasons, one may want the
> option to force everything to be docker, but that's more of a "don't make
> substitutions" option than manually providing environment configs.)


In this example, wouldn't you normally just rebuild the pipeline? I'm not
sure what the advantage of re-using the same model proto is.

 It would be helpful for me to have concrete usecases of why a user wants
> to customize the container used by some transform they did not write, which
> could possibly inform the best course(s) of action here.


I should have led with this. Someone wanted to mount credentials into the
SDK harness [1]. So in this particular case the user just wants to mount
files into their SDK harness, which is a pretty common use case, so
resource hints are probably a more appropriate solution.

[1]
https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E


On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com> wrote:
>
>> So, an external transform is uniquely identified by its URN. An external
>>> transform identified by a URN may refer to an arbitrary composite which may
>>> have sub-transforms that refer to different environments. I think with the
>>> above proposal we'll lose this flexibility.
>>> What we need is a way to override environments (or properties of
>>> environments) that results in the final pipeline proto. Once we modify such
>>> environments in the proto it will be reflected to all transforms that
>>> utilize such environments.
>>
>>
>> As far as I can tell we currently only register a single environment for
>> the entire transform (and it's always the default). Am I missing something?
>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>
>> Anyway, I don't see how sub-transforms require overrides. We should be
>> able to propagate environment options to sub-transforms to achieve the same
>> purpose.
>>
>
> The discussion of resource hints at
> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
> actually may tie into this as well. I would assume a localised request for,
> say, high memory should be propagated down to cross-language pipelines. It
> is possible that other customizations (such as making sure specific
> dependencies are available, or filesystems mounted) would fit here too.
>
> This gets into the distinction of customizing what kind of environment one
> wants to have (which could be generally applicable) vs. an absolute
> designation of a particular environment (e.g. a docker image).
>
> Note that what we're running into in part is that "pipeline options" are
> the wrong level of granularity for specifying characteristics of an
> environment, as there is not a single environment to parameterize (or,
> possibly, even one per language). If I call
> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
> and MyFancyTransform is composed of two environments, to which
> does docker_path apply? What about PTransforms that use ExternalTransforms
> under the hood (e.g does some pre-processing and then calls SQL, or calls
> Kafka followed by some Python-level post-processing)?
>
>
> 'sdk_harness_container_image_overrides' is such a property (which
>>> unfortunately only works for Dataflow today). Also this only works for
>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>> a new property that works for all types of environments ?
>>
>>
>> In my original email, I wrote that sdk_harness_container_image_overrides
>> is no more flexible than having a single option per SDK, since the default
>> container images for all external transforms in each SDK are expected to be
>> the same. For example, in the case of a pipeline with two external
>> transforms that both use the same default container image,
>> sdk_harness_container_image_overrides does not let the user give those two
>> transforms different containers.
>>
>> From a design standpoint, I feel find-replace is hacky and backwards.
>> It's cleaner to specify what kind of environment we want directly in
>> the ExpansionRequest. That way all of the environment creation logic
>> belongs inside the expansion service.
>>
>
> While Environments logically belong with Transforms, it is the expansion
> service's job to attach the right environments to the transforms that it
> vends. The caller should not need any visibility into the environment(s)
> that an expansion service uses, which is an implementation detail that the
> expansion service is free to change at any time. (In fact, whether it is
> (partially or fully) implemented as an external transform is an
> implementation detail that the end user should not need to care about or
> depend on.)
>
> I personally think pattern matching and substitution by runners (maybe
> more sophisticated than regexp on container names) is a reasonable way to
> approach customization of environments. For example, suppose I construct a
> pipeline that uses both Python and Java transforms. (I could do this from
> Go, Java, or Python). If I want to run this locally (e.g. on the Python
> FnAPI runner), I would prefer that the python bits be run in-process but
> would have to shell out (maybe via docker, maybe something cheaper) for the
> java bits. On the other hand, if I want to run this same pipeline (ideally,
> the same model proto, such that we don't have
> runner-dependent construction) on Flink, I might want the java bits to be
> inlined and the Python bits to be in a separate process. On Dataflow, both
> would live in containers. To do this, the Python runner would say "hey, I
> know that Python environment" and just swap it out for in-process, and vice
> versa. (For isolation/other reasons, one may want the option to force
> everything to be docker, but that's more of a "don't make substitutions"
> option than manually providing environment configs.)
>
> On the other hand, as we go the route of custom containers, especially
> expansion services that might vend custom containers, I think we need a way
> to push down *properties* of environments (such as resource hints) through
> the expansion service that may influence the environments that get attached
> and returned.
>
> It would be helpful for me to have concrete usecases of why a user wants
> to customize the container used by some transform they did not write, which
> could possibly inform the best course(s) of action here.
>
>
>
>>
>>
>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> Hi Beamers,
>>>>
>>>> Recently we’ve had some requests on user@ and Slack for instructions
>>>> on how to use custom-built containers in cross-language pipelines
>>>> (typically calling Java transforms from a predominantly Python pipeline).
>>>> Currently, it seems like there is no way to change the container used by a
>>>> cross-language transform except by modifying and rebuilding the expansion
>>>> service. The SDK does not pass pipeline options to the expansion service
>>>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>>>> if pipeline options are passed, the existing set of pipeline options still
>>>> limits the amount of control we have over environments. Here are the
>>>> existing pipeline options that I’m aware of:
>>>>
>>>> Python [2] and Go [3] have these:
>>>>
>>>>    -
>>>>
>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>    -
>>>>
>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>    string that means different things depending on environment_type. For
>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>    EXTERNAL, it is the external service address.)
>>>>
>>>>
>>>> Whereas Java [4] has defaultEnvironmentType and
>>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>>> same as the above.
>>>>
>>>> I was unsatisfied with environment_config for a number of reasons.
>>>> First, having a single overloaded option that can mean entirely different
>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>> requiring the user to type in a JSON blob for environment_config is not
>>>> especially human-friendly (though it has also been argued that JSON makes
>>>> complex arguments like this easier to parse). Finally, we must overload
>>>> this string further to introduce new environment-specific options, such as
>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>
>>>
>>> Agree.
>>>
>>>
>>>>
>>>> To address these problems, I added a new option called
>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>> similar to the “experiments” option, takes a list of strings, for example
>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>> be argued we should have made “docker_container_image” etc. top-level
>>>> options instead, but this “catch-all” design makes what I am about to
>>>> propose a lot easier.
>>>>
>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>> unrecognized pipeline options during serialization, since otherwise
>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>> to set environment_config and default_environment_config to separate
>>>> values, for Python and Java containers, respectively. However, this still
>>>> limits us to one container image for all Python and Go transforms, and one
>>>> container image for all Java transforms. As more cross-language transforms
>>>> are implemented, sooner or later someone will want to have different Java
>>>> SDK containers for different external transforms.
>>>>
>>>> (I should also mention the sdk_harness_container_image_overrides
>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>> runner. It lets us basically perform a find/replace on container image
>>>> strings. This is not significantly more flexible than having a single
>>>> option per SDK, since the default container images for all external
>>>> transforms in each SDK are expected to be the same.)
>>>>
>>>> Environments logically belong with transforms, and that’s how it works
>>>> in the Runner API [9]. The problem now is that from the user’s perspective,
>>>> the environment is bound to the expansion service. After addressing
>>>> BEAM-9449, the problem will be that one or two environments at most are
>>>> bound to the pipeline. Ideally, though, users should have fully granular
>>>> control over environments at the transform level.
>>>>
>>>> All this context for a very simple proposal: we should have all
>>>> ExternalTransform subclasses take optional environment_type and
>>>> environment_options fields in their constructors. As with their
>>>> corresponding pipeline options, these options would default to DOCKER and
>>>> none, respectively. Then we could overwrite the environment_type and
>>>> environment_options in the pipeline options passed to the expansion service
>>>> with these values. (Alternatively, we could pass environment_type and
>>>> environment_options to the expansion service individually to avoid having
>>>> to overwrite their original values, but their original values should be
>>>> irrelevant to the expansion service anyway.)
>>>>
>>>> What do you think?
>>>>
>>>
>>> So, an external transform is uniquely identified by its URN. An external
>>> transform identified by a URN may refer to an arbitrary composite which may
>>> have sub-transforms that refer to different environments. I think with the
>>> above proposal we'll lose this flexibility.
>>> What we need is a way to override environments (or properties of
>>> environments) that results in the final pipeline proto. Once we modify such
>>> environments in the proto it will be reflected to all transforms that
>>> utilize such environments.
>>>
>>> 'sdk_harness_container_image_overrides' is such a property (which
>>> unfortunately only works for Dataflow today). Also this only works for
>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>> a new property that works for all types of environments ?
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>
>>>> [2]
>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>
>>>> [3]
>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>
>>>> [4]
>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>
>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>
>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>
>>>> [7] https://github.com/apache/beam/pull/11638
>>>>
>>>> [8]
>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>
>>>> [9]
>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>
>>>>

Re: Environment options for external transforms

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kc...@google.com> wrote:

> So, an external transform is uniquely identified by its URN. An external
>> transform identified by a URN may refer to an arbitrary composite which may
>> have sub-transforms that refer to different environments. I think with the
>> above proposal we'll lose this flexibility.
>> What we need is a way to override environments (or properties of
>> environments) that results in the final pipeline proto. Once we modify such
>> environments in the proto it will be reflected to all transforms that
>> utilize such environments.
>
>
> As far as I can tell we currently only register a single environment for
> the entire transform (and it's always the default). Am I missing something?
> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>
> Anyway, I don't see how sub-transforms require overrides. We should be
> able to propagate environment options to sub-transforms to achieve the same
> purpose.
>

The discussion of resource hints at
https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
actually may tie into this as well. I would assume a localised request for,
say, high memory should be propagated down to cross-language pipelines. It
is possible that other customizations (such as making sure specific
dependencies are available, or filesystems mounted) would fit here too.

This gets into the distinction of customizing what kind of environment one
wants to have (which could be generally applicable) vs. an absolute
designation of a particular environment (e.g. a docker image).

Note that what we're running into in part is that "pipeline options" are
the wrong level of granularity for specifying characteristics of an
environment, as there is not a single environment to parameterize (or,
possibly, even one per language). If I call
ExpansionRequset(MyFancyTransform,environment_config=docker_path)
and MyFancyTransform is composed of two environments, to which
does docker_path apply? What about PTransforms that use ExternalTransforms
under the hood (e.g does some pre-processing and then calls SQL, or calls
Kafka followed by some Python-level post-processing)?


'sdk_harness_container_image_overrides' is such a property (which
>> unfortunately only works for Dataflow today). Also this only works for
>> Docker URLs. Maybe we can extend this property to all runners or introduce
>> a new property that works for all types of environments ?
>
>
> In my original email, I wrote that sdk_harness_container_image_overrides
> is no more flexible than having a single option per SDK, since the default
> container images for all external transforms in each SDK are expected to be
> the same. For example, in the case of a pipeline with two external
> transforms that both use the same default container image,
> sdk_harness_container_image_overrides does not let the user give those two
> transforms different containers.
>
> From a design standpoint, I feel find-replace is hacky and backwards. It's
> cleaner to specify what kind of environment we want directly in
> the ExpansionRequest. That way all of the environment creation logic
> belongs inside the expansion service.
>

While Environments logically belong with Transforms, it is the expansion
service's job to attach the right environments to the transforms that it
vends. The caller should not need any visibility into the environment(s)
that an expansion service uses, which is an implementation detail that the
expansion service is free to change at any time. (In fact, whether it is
(partially or fully) implemented as an external transform is an
implementation detail that the end user should not need to care about or
depend on.)

I personally think pattern matching and substitution by runners (maybe more
sophisticated than regexp on container names) is a reasonable way to
approach customization of environments. For example, suppose I construct a
pipeline that uses both Python and Java transforms. (I could do this from
Go, Java, or Python). If I want to run this locally (e.g. on the Python
FnAPI runner), I would prefer that the python bits be run in-process but
would have to shell out (maybe via docker, maybe something cheaper) for the
java bits. On the other hand, if I want to run this same pipeline (ideally,
the same model proto, such that we don't have
runner-dependent construction) on Flink, I might want the java bits to be
inlined and the Python bits to be in a separate process. On Dataflow, both
would live in containers. To do this, the Python runner would say "hey, I
know that Python environment" and just swap it out for in-process, and vice
versa. (For isolation/other reasons, one may want the option to force
everything to be docker, but that's more of a "don't make substitutions"
option than manually providing environment configs.)

On the other hand, as we go the route of custom containers, especially
expansion services that might vend custom containers, I think we need a way
to push down *properties* of environments (such as resource hints) through
the expansion service that may influence the environments that get attached
and returned.

It would be helpful for me to have concrete usecases of why a user wants to
customize the container used by some transform they did not write, which
could possibly inform the best course(s) of action here.



>
>
> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> Hi Beamers,
>>>
>>> Recently we’ve had some requests on user@ and Slack for instructions on
>>> how to use custom-built containers in cross-language pipelines (typically
>>> calling Java transforms from a predominantly Python pipeline). Currently,
>>> it seems like there is no way to change the container used by a
>>> cross-language transform except by modifying and rebuilding the expansion
>>> service. The SDK does not pass pipeline options to the expansion service
>>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>>> if pipeline options are passed, the existing set of pipeline options still
>>> limits the amount of control we have over environments. Here are the
>>> existing pipeline options that I’m aware of:
>>>
>>> Python [2] and Go [3] have these:
>>>
>>>    -
>>>
>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>    -
>>>
>>>    environment_config (This one is confusingly overloaded. It’s a
>>>    string that means different things depending on environment_type. For
>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>    EXTERNAL, it is the external service address.)
>>>
>>>
>>> Whereas Java [4] has defaultEnvironmentType and
>>> defaultEnvironmentConfig, which are named differently but otherwise act the
>>> same as the above.
>>>
>>> I was unsatisfied with environment_config for a number of reasons.
>>> First, having a single overloaded option that can mean entirely different
>>> things depending on context is poor design. Second, in PROCESS mode,
>>> requiring the user to type in a JSON blob for environment_config is not
>>> especially human-friendly (though it has also been argued that JSON makes
>>> complex arguments like this easier to parse). Finally, we must overload
>>> this string further to introduce new environment-specific options, such as
>>> a mounted Docker volume (BEAM-5440 [5]).
>>>
>>
>> Agree.
>>
>>
>>>
>>> To address these problems, I added a new option called
>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>> similar to the “experiments” option, takes a list of strings, for example
>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>> be argued we should have made “docker_container_image” etc. top-level
>>> options instead, but this “catch-all” design makes what I am about to
>>> propose a lot easier.
>>>
>>> The solution proposed in PR #11638 [7] set a flag to include
>>> unrecognized pipeline options during serialization, since otherwise
>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>> to set environment_config and default_environment_config to separate
>>> values, for Python and Java containers, respectively. However, this still
>>> limits us to one container image for all Python and Go transforms, and one
>>> container image for all Java transforms. As more cross-language transforms
>>> are implemented, sooner or later someone will want to have different Java
>>> SDK containers for different external transforms.
>>>
>>> (I should also mention the sdk_harness_container_image_overrides
>>> pipeline option [8], which is currently only supported by the Dataflow
>>> runner. It lets us basically perform a find/replace on container image
>>> strings. This is not significantly more flexible than having a single
>>> option per SDK, since the default container images for all external
>>> transforms in each SDK are expected to be the same.)
>>>
>>> Environments logically belong with transforms, and that’s how it works
>>> in the Runner API [9]. The problem now is that from the user’s perspective,
>>> the environment is bound to the expansion service. After addressing
>>> BEAM-9449, the problem will be that one or two environments at most are
>>> bound to the pipeline. Ideally, though, users should have fully granular
>>> control over environments at the transform level.
>>>
>>> All this context for a very simple proposal: we should have all
>>> ExternalTransform subclasses take optional environment_type and
>>> environment_options fields in their constructors. As with their
>>> corresponding pipeline options, these options would default to DOCKER and
>>> none, respectively. Then we could overwrite the environment_type and
>>> environment_options in the pipeline options passed to the expansion service
>>> with these values. (Alternatively, we could pass environment_type and
>>> environment_options to the expansion service individually to avoid having
>>> to overwrite their original values, but their original values should be
>>> irrelevant to the expansion service anyway.)
>>>
>>> What do you think?
>>>
>>
>> So, an external transform is uniquely identified by its URN. An external
>> transform identified by a URN may refer to an arbitrary composite which may
>> have sub-transforms that refer to different environments. I think with the
>> above proposal we'll lose this flexibility.
>> What we need is a way to override environments (or properties of
>> environments) that results in the final pipeline proto. Once we modify such
>> environments in the proto it will be reflected to all transforms that
>> utilize such environments.
>>
>> 'sdk_harness_container_image_overrides' is such a property (which
>> unfortunately only works for Dataflow today). Also this only works for
>> Docker URLs. Maybe we can extend this property to all runners or introduce
>> a new property that works for all types of environments ?
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>
>>> [2]
>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>
>>> [3]
>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>
>>> [4]
>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>
>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>
>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>
>>> [7] https://github.com/apache/beam/pull/11638
>>>
>>> [8]
>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>
>>> [9]
>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>
>>>

Re: Environment options for external transforms

Posted by Kyle Weaver <kc...@google.com>.
>
> So, an external transform is uniquely identified by its URN. An external
> transform identified by a URN may refer to an arbitrary composite which may
> have sub-transforms that refer to different environments. I think with the
> above proposal we'll lose this flexibility.
> What we need is a way to override environments (or properties of
> environments) that results in the final pipeline proto. Once we modify such
> environments in the proto it will be reflected to all transforms that
> utilize such environments.


As far as I can tell we currently only register a single environment for
the entire transform (and it's always the default). Am I missing something?
https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449

Anyway, I don't see how sub-transforms require overrides. We should be able
to propagate environment options to sub-transforms to achieve the same
purpose.

'sdk_harness_container_image_overrides' is such a property (which
> unfortunately only works for Dataflow today). Also this only works for
> Docker URLs. Maybe we can extend this property to all runners or introduce
> a new property that works for all types of environments ?


In my original email, I wrote that sdk_harness_container_image_overrides is
no more flexible than having a single option per SDK, since the default
container images for all external transforms in each SDK are expected to be
the same. For example, in the case of a pipeline with two external
transforms that both use the same default container image,
sdk_harness_container_image_overrides does not let the user give those two
transforms different containers.

From a design standpoint, I feel find-replace is hacky and backwards. It's
cleaner to specify what kind of environment we want directly in
the ExpansionRequest. That way all of the environment creation logic
belongs inside the expansion service.


On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com> wrote:
>
>> Hi Beamers,
>>
>> Recently we’ve had some requests on user@ and Slack for instructions on
>> how to use custom-built containers in cross-language pipelines (typically
>> calling Java transforms from a predominantly Python pipeline). Currently,
>> it seems like there is no way to change the container used by a
>> cross-language transform except by modifying and rebuilding the expansion
>> service. The SDK does not pass pipeline options to the expansion service
>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>> if pipeline options are passed, the existing set of pipeline options still
>> limits the amount of control we have over environments. Here are the
>> existing pipeline options that I’m aware of:
>>
>> Python [2] and Go [3] have these:
>>
>>    -
>>
>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>    -
>>
>>    environment_config (This one is confusingly overloaded. It’s a string
>>    that means different things depending on environment_type. For DOCKER, it
>>    is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
>>    the external service address.)
>>
>>
>> Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
>> which are named differently but otherwise act the same as the above.
>>
>> I was unsatisfied with environment_config for a number of reasons. First,
>> having a single overloaded option that can mean entirely different things
>> depending on context is poor design. Second, in PROCESS mode, requiring the
>> user to type in a JSON blob for environment_config is not especially
>> human-friendly (though it has also been argued that JSON makes complex
>> arguments like this easier to parse). Finally, we must overload this string
>> further to introduce new environment-specific options, such as a mounted
>> Docker volume (BEAM-5440 [5]).
>>
>
> Agree.
>
>
>>
>> To address these problems, I added a new option called
>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>> similar to the “experiments” option, takes a list of strings, for example
>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>> be argued we should have made “docker_container_image” etc. top-level
>> options instead, but this “catch-all” design makes what I am about to
>> propose a lot easier.
>>
>> The solution proposed in PR #11638 [7] set a flag to include unrecognized
>> pipeline options during serialization, since otherwise unrecognized options
>> are dropped. In a Python pipeline, this will allow us to set
>> environment_config and default_environment_config to separate values, for
>> Python and Java containers, respectively. However, this still limits us to
>> one container image for all Python and Go transforms, and one container
>> image for all Java transforms. As more cross-language transforms are
>> implemented, sooner or later someone will want to have different Java SDK
>> containers for different external transforms.
>>
>> (I should also mention the sdk_harness_container_image_overrides pipeline
>> option [8], which is currently only supported by the Dataflow runner. It
>> lets us basically perform a find/replace on container image strings. This
>> is not significantly more flexible than having a single option per SDK,
>> since the default container images for all external transforms in each SDK
>> are expected to be the same.)
>>
>> Environments logically belong with transforms, and that’s how it works in
>> the Runner API [9]. The problem now is that from the user’s perspective,
>> the environment is bound to the expansion service. After addressing
>> BEAM-9449, the problem will be that one or two environments at most are
>> bound to the pipeline. Ideally, though, users should have fully granular
>> control over environments at the transform level.
>>
>> All this context for a very simple proposal: we should have all
>> ExternalTransform subclasses take optional environment_type and
>> environment_options fields in their constructors. As with their
>> corresponding pipeline options, these options would default to DOCKER and
>> none, respectively. Then we could overwrite the environment_type and
>> environment_options in the pipeline options passed to the expansion service
>> with these values. (Alternatively, we could pass environment_type and
>> environment_options to the expansion service individually to avoid having
>> to overwrite their original values, but their original values should be
>> irrelevant to the expansion service anyway.)
>>
>> What do you think?
>>
>
> So, an external transform is uniquely identified by its URN. An external
> transform identified by a URN may refer to an arbitrary composite which may
> have sub-transforms that refer to different environments. I think with the
> above proposal we'll lose this flexibility.
> What we need is a way to override environments (or properties of
> environments) that results in the final pipeline proto. Once we modify such
> environments in the proto it will be reflected to all transforms that
> utilize such environments.
>
> 'sdk_harness_container_image_overrides' is such a property (which
> unfortunately only works for Dataflow today). Also this only works for
> Docker URLs. Maybe we can extend this property to all runners or introduce
> a new property that works for all types of environments ?
>
> Thanks,
> Cham
>
>
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>
>> [2]
>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>
>> [3]
>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>
>> [4]
>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>
>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>
>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>
>> [7] https://github.com/apache/beam/pull/11638
>>
>> [8]
>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>
>> [9]
>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>
>>

Re: Environment options for external transforms

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kc...@google.com> wrote:

> Hi Beamers,
>
> Recently we’ve had some requests on user@ and Slack for instructions on
> how to use custom-built containers in cross-language pipelines (typically
> calling Java transforms from a predominantly Python pipeline). Currently,
> it seems like there is no way to change the container used by a
> cross-language transform except by modifying and rebuilding the expansion
> service. The SDK does not pass pipeline options to the expansion service
> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
> if pipeline options are passed, the existing set of pipeline options still
> limits the amount of control we have over environments. Here are the
> existing pipeline options that I’m aware of:
>
> Python [2] and Go [3] have these:
>
>    -
>
>    environment_type (DOCKER, PROCESS, LOOPBACK)
>    -
>
>    environment_config (This one is confusingly overloaded. It’s a string
>    that means different things depending on environment_type. For DOCKER, it
>    is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
>    the external service address.)
>
>
> Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
> which are named differently but otherwise act the same as the above.
>
> I was unsatisfied with environment_config for a number of reasons. First,
> having a single overloaded option that can mean entirely different things
> depending on context is poor design. Second, in PROCESS mode, requiring the
> user to type in a JSON blob for environment_config is not especially
> human-friendly (though it has also been argued that JSON makes complex
> arguments like this easier to parse). Finally, we must overload this string
> further to introduce new environment-specific options, such as a mounted
> Docker volume (BEAM-5440 [5]).
>

Agree.


>
> To address these problems, I added a new option called
> “environment_options” (BEAM-10671 [6]). (This option has been implemented
> in the Python SDK, but not the other SDKs yet.) Environment_options,
> similar to the “experiments” option, takes a list of strings, for example
> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
> be argued we should have made “docker_container_image” etc. top-level
> options instead, but this “catch-all” design makes what I am about to
> propose a lot easier.
>
> The solution proposed in PR #11638 [7] set a flag to include unrecognized
> pipeline options during serialization, since otherwise unrecognized options
> are dropped. In a Python pipeline, this will allow us to set
> environment_config and default_environment_config to separate values, for
> Python and Java containers, respectively. However, this still limits us to
> one container image for all Python and Go transforms, and one container
> image for all Java transforms. As more cross-language transforms are
> implemented, sooner or later someone will want to have different Java SDK
> containers for different external transforms.
>
> (I should also mention the sdk_harness_container_image_overrides pipeline
> option [8], which is currently only supported by the Dataflow runner. It
> lets us basically perform a find/replace on container image strings. This
> is not significantly more flexible than having a single option per SDK,
> since the default container images for all external transforms in each SDK
> are expected to be the same.)
>
> Environments logically belong with transforms, and that’s how it works in
> the Runner API [9]. The problem now is that from the user’s perspective,
> the environment is bound to the expansion service. After addressing
> BEAM-9449, the problem will be that one or two environments at most are
> bound to the pipeline. Ideally, though, users should have fully granular
> control over environments at the transform level.
>
> All this context for a very simple proposal: we should have all
> ExternalTransform subclasses take optional environment_type and
> environment_options fields in their constructors. As with their
> corresponding pipeline options, these options would default to DOCKER and
> none, respectively. Then we could overwrite the environment_type and
> environment_options in the pipeline options passed to the expansion service
> with these values. (Alternatively, we could pass environment_type and
> environment_options to the expansion service individually to avoid having
> to overwrite their original values, but their original values should be
> irrelevant to the expansion service anyway.)
>
> What do you think?
>

So, an external transform is uniquely identified by its URN. An external
transform identified by a URN may refer to an arbitrary composite which may
have sub-transforms that refer to different environments. I think with the
above proposal we'll lose this flexibility.
What we need is a way to override environments (or properties of
environments) that results in the final pipeline proto. Once we modify such
environments in the proto it will be reflected to all transforms that
utilize such environments.

'sdk_harness_container_image_overrides' is such a property (which
unfortunately only works for Dataflow today). Also this only works for
Docker URLs. Maybe we can extend this property to all runners or introduce
a new property that works for all types of environments ?

Thanks,
Cham


>
> [1] https://issues.apache.org/jira/browse/BEAM-9449
>
> [2]
> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>
> [3]
> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>
> [4]
> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>
> [5] https://issues.apache.org/jira/browse/BEAM-5440
>
> [6] https://issues.apache.org/jira/browse/BEAM-10671
>
> [7] https://github.com/apache/beam/pull/11638
>
> [8]
> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>
> [9]
> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>
>