You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Heejong Lee <he...@google.com> on 2019/11/26 01:49:44 UTC

Re: Artifact staging in cross-language pipelines

Hi,

Is anyone actively working on artifact staging extension for cross-language
pipelines? I'm thinking I can contribute to it in coming Dec. If anyone has
any progress on this and needs help, please let me know.

Thanks,

On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Can you please add this to the design documents webpage.
> https://beam.apache.org/contribute/design-documents/
>
> On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <ch...@google.com>
> wrote:
> >
> >
> >
> > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >>
> >> Here's the first draft:
> >>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> >>
> >> It's rather high-level. We may want to add more details once we have
> >> finalized the design. Feel free to make comments and edits.
> >
> >
> > Thanks Max. Added some comments.
> >
> >>
> >>
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >>
> >> +1 I came to the same conclusion while thinking about how to store
> >> artifact information for deferred execution of the pipeline.
> >>
> >> -Max
> >>
> >> On 07.05.19 18:10, Robert Bradshaw wrote:
> >> > Looking forward to your writeup, Max. In the meantime, some comments
> below.
> >> >
> >> >
> >> > From: Lukasz Cwik <lc...@google.com>
> >> > Date: Thu, May 2, 2019 at 6:45 PM
> >> > To: dev
> >> >
> >> >>
> >> >>
> >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>>
> >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>
> >> >>>> We should stick with URN + payload + artifact metadata[1] where
> the only mandatory one that all SDKs and expansion services understand is
> the "bytes" artifact type. This allows us to add optional URNs for file://,
> http://, Maven, PyPi, ... in the future. I would make the artifact
> staging service use the same URN + payload mechanism to get compatibility
> of artifacts across the different services and also have the artifact
> staging service be able to be queried for the list of artifact types it
> supports.
> >> >>>
> >> >>> +1
> >> >>>
> >> >>>> Finally, we would need to have environments enumerate the artifact
> types that they support.
> >> >>>
> >> >>> Meaning at runtime, or as another field statically set in the proto?
> >> >>
> >> >>
> >> >> I don't believe runners/SDKs should have to know what artifacts each
> environment supports at runtime and instead have environments enumerate
> them explicitly in the proto. I have been thinking about a more general
> "capabilities" block on environments which allow them to enumerate URNs
> that the environment understands. This would include artifact type URNs,
> PTransform URNs, coder URNs, ... I haven't proposed anything specific down
> this line yet because I was wondering how environment resources (CPU, min
> memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could
> tie into this.
> >> >>
> >> >>>
> >> >>>> Having everyone have the same "artifact" representation would be
> beneficial since:
> >> >>>> a) Python environments could install dependencies from a
> requirements.txt file (something that the Google Cloud Dataflow Python
> docker container allows for today)
> >> >>>> b) It provides an extensible and versioned mechanism for SDKs,
> environments, and artifact staging/retrieval services to support additional
> artifact types
> >> >>>> c) Allow for expressing a canonical representation of an artifact
> like a Maven package so a runner could merge environments that the runner
> deems compatible.
> >> >>>>
> >> >>>> The flow I could see is:
> >> >>>> 1) (optional) query artifact staging service for supported
> artifact types
> >> >>>> 2) SDK request expansion service to expand transform passing in a
> list of artifact types the SDK and artifact staging service support, the
> expansion service returns a list of artifact types limited to those
> supported types + any supported by the environment
> >> >>>
> >> >>> The crux of the issue seems to be how the expansion service returns
> >> >>> the artifacts themselves. Is this going with the approach that the
> >> >>> caller of the expansion service must host an artifact staging
> service?
> >> >>
> >> >>
> >> >> The caller would not need to host an artifact staging service (but
> would become effectively a proxy service, see my comment below for more
> details) as I would have expected this to be part of the expansion service
> response.
> >> >>
> >> >>>
> >> >>> There is also the question here is how the returned artifacts get
> >> >>> attached to the various environments, or whether they get implicitly
> >> >>> applied to all returned stages (which need not have a consistent
> >> >>> environment)?
> >> >>
> >> >>
> >> >> I would suggest returning additional information that says what
> artifact is for which environment. Applying all artifacts to all
> environments is likely to cause issues since some environments may not
> understand certain artifact types or may get conflicting versions of
> artifacts. I would see this happening since an expansion service that
> aggregates other expansion services seems likely, for example:
> >> >>                               /-> ExpansionSerivce(Python)
> >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >> >>                               \-> ExpansionSerivce(Go)
> >> >
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >> >
> >> >>>> 3) SDK converts any artifact types that the artifact staging
> service or environment doesn't understand, e.g. pulls down Maven
> dependencies and converts them to "bytes" artifacts
> >> >>>
> >> >>> Here I think we're conflating two things. The "type" of an artifact
> is
> >> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g.
> is
> >> >>> this a jar file, or a pip tarball, or just some data needed by a
> DoFn,
> >> >>> or ...) Only (1) can be freely transmuted.
> >> >>
> >> >>
> >> >> Your right. Thinking about this some more, general artifact
> conversion is unlikely to be practical because how to interpret an artifact
> is environment dependent. For example, a requirements.txt used to install
> pip packages for a Python docker container depends on the filesystem layout
> of that specific docker container. One could simulate doing a pip install
> on the same filesystem, see the diff and then of all the packages in
> requirements.txt but this quickly becomes impractical.
> >> >>
> >> >>>
> >> >>>> 4) SDK sends artifacts to artifact staging service
> >> >>>> 5) Artifact staging service converts any artifacts to types that
> the environment understands
> >> >>>> 6) Environment is started and gets artifacts from the artifact
> retrieval service.
> >> >>>>
> >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >>>>>
> >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <
> mxm@apache.org> wrote:
> >> >>>>>>
> >> >>>>>> Good idea to let the client expose an artifact staging service
> that the
> >> >>>>>> ExpansionService could use to stage artifacts. This solves two
> problems:
> >> >>>>>>
> >> >>>>>> (1) The Expansion Service not being able to access the Job Server
> >> >>>>>> artifact staging service
> >> >>>>>> (2) The client not having access to the dependencies returned by
> the
> >> >>>>>> Expansion Server
> >> >>>>>>
> >> >>>>>> The downside is that it adds an additional indirection. The
> alternative
> >> >>>>>> to let the client handle staging the artifacts returned by the
> Expansion
> >> >>>>>> Server is more transparent and easier to implement.
> >> >>>>>
> >> >>>>> The other downside is that it may not always be possible for the
> >> >>>>> expansion service to connect to the artifact staging service (e.g.
> >> >>>>> when constructing a pipeline locally against a remote expansion
> >> >>>>> service).
> >> >>>>
> >> >>>> Just to make sure, your saying the expansion service would return
> all the artifacts (bytes, urls, ...) as part of the response since the
> expansion service wouldn't be able to connect to the SDK that is running
> locally either.
> >> >>>
> >> >>> Yes. Well, more I'm asking how the expansion service would return
> any
> >> >>> artifacts.
> >> >>>
> >> >>> What we have is
> >> >>>
> >> >>> Runner <--- SDK ---> Expansion service.
> >> >>>
> >> >>> Where the unidirectional arrow means "instantiates a connection
> with"
> >> >>> and the other direction (and missing arrows) may not be possible.
> >> >>
> >> >>
> >> >> I believe the ExpansionService Expand request should become a
> unidirectional stream back to the caller so that artifacts could be sent
> back to the SDK (effectively mirroring the artifact staging service API).
> So the expansion response would stream back a bunch artifact data messages
> and also the expansion response containing PTransform information.
> >> >
> >> > +1.
> >> >
> >> >>>>>> Ideally, the Expansion Service won't return any dependencies
> because the
> >> >>>>>> environment already contains the required dependencies. We could
> make it
> >> >>>>>> a requirement for the expansion to be performed inside an
> environment.
> >> >>>>>> Then we would already ensure during expansion time that the
> runtime
> >> >>>>>> dependencies are available.
> >> >>>>>
> >> >>>>> Yes, it's cleanest if the expansion service provides an
> environment
> >> >>>>> without all the dependencies provided. Interesting idea to make
> this a
> >> >>>>> property of the expansion service itself.
> >> >>>>
> >> >>>> I had thought this too but an opaque docker container that was
> built on top of a base Beam docker container would be very difficult for a
> runner to introspect and check to see if its compatible to allow for fusion
> across PTransforms. I think artifacts need to be communicated in their
> canonical representation.
> >> >>>
> >> >>> It's clean (from the specification point of view), but doesn't allow
> >> >>> for good introspection/fusion (aside from one being a base of
> another,
> >> >>> perhaps).
> >> >>>
> >> >>>>>>> In this case, the runner would (as
> >> >>>>>>> requested by its configuration) be free to merge environments it
> >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> >> >>>>>>> beam-java-embedded if it considers itself compatible with the
> >> >>>>>>> dependency list.
> >> >>>>>>
> >> >>>>>> Could you explain how that would work in practice?
> >> >>>>>
> >> >>>>> Say one has a pipeline with environments
> >> >>>>>
> >> >>>>> A: beam-java-sdk-2.12-docker
> >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> >> >>>>>
> >> >>>>> A runner could (conceivably) be intelligent enough to know that
> dep1
> >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single
> >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
> >> >>>>> corresponding fusion and lower overhead benefits). If a certain
> >> >>>>> pipeline option is set, it might further note that dep1 and dep2
> are
> >> >>>>> compatible with its own workers, which are build against
> sdk-2.12, and
> >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> >> >>>>
> >> >>>> We have been talking about the expansion service and cross
> language transforms a lot lately but I believe it will initially come at
> the cost of poor fusion of transforms since "merging" environments that are
> compatible is a difficult problem since it brings up many of the dependency
> management issues (e.g. diamond dependency issues).
> >> >>>
> >> >>> I agree. I think expansion services offering "kitchen-sink"
> >> >>> containers, when possible, can go far here. If we could at least
> >> >>> recognize when one environment/set of deps is a superset of another,
> >> >>> that could be an easy case that would yield a lot of benefit as
> well.
> >> >>
> >> >>
> >> >> +1
>

Re: Artifact staging in cross-language pipelines

Posted by Robert Bradshaw <ro...@google.com>.
Thanks! I've left some comments on the doc.

On Tue, Dec 17, 2019, 5:03 PM Heejong Lee <he...@google.com> wrote:

> Hi,
>
> I wrote the draft on implementation plan[1]. The summary is on the first
> page. Any help would be appreciated!
>
> [1]:
> https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing
>
> On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <he...@google.com> wrote:
>
>> I'm brushing up memory by revisiting the doc[1] and it seems like we've
>> already reached the consensus on the bigger picture. I would start drafting
>> the implementation plan.
>>
>> [1]:
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>>
>> On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hey Heejong,
>>>
>>> I don't think so. It would be great to push this forward.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 26.11.19 02:49, Heejong Lee wrote:
>>> > Hi,
>>> >
>>> > Is anyone actively working on artifact staging extension for
>>> > cross-language pipelines? I'm thinking I can contribute to it in
>>> coming
>>> > Dec. If anyone has any progress on this and needs help, please let me
>>> know.
>>> >
>>> > Thanks,
>>> >
>>> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
>>> > <ma...@gmail.com>> wrote:
>>> >
>>> >     Can you please add this to the design documents webpage.
>>> >     https://beam.apache.org/contribute/design-documents/
>>> >
>>> >     On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
>>> >     <chamikara@google.com <ma...@google.com>> wrote:
>>> >      >
>>> >      >
>>> >      >
>>> >      > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
>>> >     <mxm@apache.org <ma...@apache.org>> wrote:
>>> >      >>
>>> >      >> Here's the first draft:
>>> >      >>
>>> >
>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>>> >      >>
>>> >      >> It's rather high-level. We may want to add more details once
>>> we have
>>> >      >> finalized the design. Feel free to make comments and edits.
>>> >      >
>>> >      >
>>> >      > Thanks Max. Added some comments.
>>> >      >
>>> >      >>
>>> >      >>
>>> >      >> > All of this goes back to the idea that I think the listing of
>>> >      >> > artifacts (or more general dependencies) should be a property
>>> >     of the
>>> >      >> > environment themselves.
>>> >      >>
>>> >      >> +1 I came to the same conclusion while thinking about how to
>>> store
>>> >      >> artifact information for deferred execution of the pipeline.
>>> >      >>
>>> >      >> -Max
>>> >      >>
>>> >      >> On 07.05.19 18:10, Robert Bradshaw wrote:
>>> >      >> > Looking forward to your writeup, Max. In the meantime, some
>>> >     comments below.
>>> >      >> >
>>> >      >> >
>>> >      >> > From: Lukasz Cwik <lcwik@google.com <mailto:lcwik@google.com
>>> >>
>>> >      >> > Date: Thu, May 2, 2019 at 6:45 PM
>>> >      >> > To: dev
>>> >      >> >
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
>>> >     <robertwb@google.com <ma...@google.com>> wrote:
>>> >      >> >>>
>>> >      >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
>>> >     <lcwik@google.com <ma...@google.com>> wrote:
>>> >      >> >>>>
>>> >      >> >>>> We should stick with URN + payload + artifact metadata[1]
>>> >     where the only mandatory one that all SDKs and expansion services
>>> >     understand is the "bytes" artifact type. This allows us to add
>>> >     optional URNs for file://, http://, Maven, PyPi, ... in the
>>> future.
>>> >     I would make the artifact staging service use the same URN +
>>> payload
>>> >     mechanism to get compatibility of artifacts across the different
>>> >     services and also have the artifact staging service be able to be
>>> >     queried for the list of artifact types it supports.
>>> >      >> >>>
>>> >      >> >>> +1
>>> >      >> >>>
>>> >      >> >>>> Finally, we would need to have environments enumerate the
>>> >     artifact types that they support.
>>> >      >> >>>
>>> >      >> >>> Meaning at runtime, or as another field statically set in
>>> >     the proto?
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> I don't believe runners/SDKs should have to know what
>>> >     artifacts each environment supports at runtime and instead have
>>> >     environments enumerate them explicitly in the proto. I have been
>>> >     thinking about a more general "capabilities" block on environments
>>> >     which allow them to enumerate URNs that the environment
>>> understands.
>>> >     This would include artifact type URNs, PTransform URNs, coder URNs,
>>> >     ... I haven't proposed anything specific down this line yet because
>>> >     I was wondering how environment resources (CPU, min memory,
>>> hardware
>>> >     like GPU, AWS/GCP/Azure/... machine types) should/could tie into
>>> this.
>>> >      >> >>
>>> >      >> >>>
>>> >      >> >>>> Having everyone have the same "artifact" representation
>>> >     would be beneficial since:
>>> >      >> >>>> a) Python environments could install dependencies from a
>>> >     requirements.txt file (something that the Google Cloud Dataflow
>>> >     Python docker container allows for today)
>>> >      >> >>>> b) It provides an extensible and versioned mechanism for
>>> >     SDKs, environments, and artifact staging/retrieval services to
>>> >     support additional artifact types
>>> >      >> >>>> c) Allow for expressing a canonical representation of an
>>> >     artifact like a Maven package so a runner could merge environments
>>> >     that the runner deems compatible.
>>> >      >> >>>>
>>> >      >> >>>> The flow I could see is:
>>> >      >> >>>> 1) (optional) query artifact staging service for supported
>>> >     artifact types
>>> >      >> >>>> 2) SDK request expansion service to expand transform
>>> >     passing in a list of artifact types the SDK and artifact staging
>>> >     service support, the expansion service returns a list of artifact
>>> >     types limited to those supported types + any supported by the
>>> >     environment
>>> >      >> >>>
>>> >      >> >>> The crux of the issue seems to be how the expansion service
>>> >     returns
>>> >      >> >>> the artifacts themselves. Is this going with the approach
>>> >     that the
>>> >      >> >>> caller of the expansion service must host an artifact
>>> >     staging service?
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> The caller would not need to host an artifact staging
>>> service
>>> >     (but would become effectively a proxy service, see my comment below
>>> >     for more details) as I would have expected this to be part of the
>>> >     expansion service response.
>>> >      >> >>
>>> >      >> >>>
>>> >      >> >>> There is also the question here is how the returned
>>> >     artifacts get
>>> >      >> >>> attached to the various environments, or whether they get
>>> >     implicitly
>>> >      >> >>> applied to all returned stages (which need not have a
>>> consistent
>>> >      >> >>> environment)?
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> I would suggest returning additional information that says
>>> >     what artifact is for which environment. Applying all artifacts to
>>> >     all environments is likely to cause issues since some environments
>>> >     may not understand certain artifact types or may get conflicting
>>> >     versions of artifacts. I would see this happening since an
>>> expansion
>>> >     service that aggregates other expansion services seems likely, for
>>> >     example:
>>> >      >> >>                               /-> ExpansionSerivce(Python)
>>> >      >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>>> >      >> >>                               \-> ExpansionSerivce(Go)
>>> >      >> >
>>> >      >> > All of this goes back to the idea that I think the listing of
>>> >      >> > artifacts (or more general dependencies) should be a property
>>> >     of the
>>> >      >> > environment themselves.
>>> >      >> >
>>> >      >> >>>> 3) SDK converts any artifact types that the artifact
>>> >     staging service or environment doesn't understand, e.g. pulls down
>>> >     Maven dependencies and converts them to "bytes" artifacts
>>> >      >> >>>
>>> >      >> >>> Here I think we're conflating two things. The "type" of an
>>> >     artifact is
>>> >      >> >>> both (1) how to fetch the bytes and (2) how to interpret
>>> >     them (e.g. is
>>> >      >> >>> this a jar file, or a pip tarball, or just some data needed
>>> >     by a DoFn,
>>> >      >> >>> or ...) Only (1) can be freely transmuted.
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> Your right. Thinking about this some more, general artifact
>>> >     conversion is unlikely to be practical because how to interpret an
>>> >     artifact is environment dependent. For example, a requirements.txt
>>> >     used to install pip packages for a Python docker container depends
>>> >     on the filesystem layout of that specific docker container. One
>>> >     could simulate doing a pip install on the same filesystem, see the
>>> >     diff and then of all the packages in requirements.txt but this
>>> >     quickly becomes impractical.
>>> >      >> >>
>>> >      >> >>>
>>> >      >> >>>> 4) SDK sends artifacts to artifact staging service
>>> >      >> >>>> 5) Artifact staging service converts any artifacts to
>>> types
>>> >     that the environment understands
>>> >      >> >>>> 6) Environment is started and gets artifacts from the
>>> >     artifact retrieval service.
>>> >      >> >>>>
>>> >      >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
>>> >     <robertwb@google.com <ma...@google.com>> wrote:
>>> >      >> >>>>>
>>> >      >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
>>> >     <mxm@apache.org <ma...@apache.org>> wrote:
>>> >      >> >>>>>>
>>> >      >> >>>>>> Good idea to let the client expose an artifact staging
>>> >     service that the
>>> >      >> >>>>>> ExpansionService could use to stage artifacts. This
>>> >     solves two problems:
>>> >      >> >>>>>>
>>> >      >> >>>>>> (1) The Expansion Service not being able to access the
>>> >     Job Server
>>> >      >> >>>>>> artifact staging service
>>> >      >> >>>>>> (2) The client not having access to the dependencies
>>> >     returned by the
>>> >      >> >>>>>> Expansion Server
>>> >      >> >>>>>>
>>> >      >> >>>>>> The downside is that it adds an additional indirection.
>>> >     The alternative
>>> >      >> >>>>>> to let the client handle staging the artifacts returned
>>> >     by the Expansion
>>> >      >> >>>>>> Server is more transparent and easier to implement.
>>> >      >> >>>>>
>>> >      >> >>>>> The other downside is that it may not always be possible
>>> >     for the
>>> >      >> >>>>> expansion service to connect to the artifact staging
>>> >     service (e.g.
>>> >      >> >>>>> when constructing a pipeline locally against a remote
>>> >     expansion
>>> >      >> >>>>> service).
>>> >      >> >>>>
>>> >      >> >>>> Just to make sure, your saying the expansion service would
>>> >     return all the artifacts (bytes, urls, ...) as part of the response
>>> >     since the expansion service wouldn't be able to connect to the SDK
>>> >     that is running locally either.
>>> >      >> >>>
>>> >      >> >>> Yes. Well, more I'm asking how the expansion service would
>>> >     return any
>>> >      >> >>> artifacts.
>>> >      >> >>>
>>> >      >> >>> What we have is
>>> >      >> >>>
>>> >      >> >>> Runner <--- SDK ---> Expansion service.
>>> >      >> >>>
>>> >      >> >>> Where the unidirectional arrow means "instantiates a
>>> >     connection with"
>>> >      >> >>> and the other direction (and missing arrows) may not be
>>> >     possible.
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> I believe the ExpansionService Expand request should become
>>> a
>>> >     unidirectional stream back to the caller so that artifacts could be
>>> >     sent back to the SDK (effectively mirroring the artifact staging
>>> >     service API). So the expansion response would stream back a bunch
>>> >     artifact data messages and also the expansion response containing
>>> >     PTransform information.
>>> >      >> >
>>> >      >> > +1.
>>> >      >> >
>>> >      >> >>>>>> Ideally, the Expansion Service won't return any
>>> >     dependencies because the
>>> >      >> >>>>>> environment already contains the required dependencies.
>>> >     We could make it
>>> >      >> >>>>>> a requirement for the expansion to be performed inside
>>> an
>>> >     environment.
>>> >      >> >>>>>> Then we would already ensure during expansion time that
>>> >     the runtime
>>> >      >> >>>>>> dependencies are available.
>>> >      >> >>>>>
>>> >      >> >>>>> Yes, it's cleanest if the expansion service provides an
>>> >     environment
>>> >      >> >>>>> without all the dependencies provided. Interesting idea
>>> to
>>> >     make this a
>>> >      >> >>>>> property of the expansion service itself.
>>> >      >> >>>>
>>> >      >> >>>> I had thought this too but an opaque docker container that
>>> >     was built on top of a base Beam docker container would be very
>>> >     difficult for a runner to introspect and check to see if its
>>> >     compatible to allow for fusion across PTransforms. I think
>>> artifacts
>>> >     need to be communicated in their canonical representation.
>>> >      >> >>>
>>> >      >> >>> It's clean (from the specification point of view), but
>>> >     doesn't allow
>>> >      >> >>> for good introspection/fusion (aside from one being a base
>>> >     of another,
>>> >      >> >>> perhaps).
>>> >      >> >>>
>>> >      >> >>>>>>> In this case, the runner would (as
>>> >      >> >>>>>>> requested by its configuration) be free to merge
>>> >     environments it
>>> >      >> >>>>>>> deemed compatible, including swapping out beam-java-X
>>> for
>>> >      >> >>>>>>> beam-java-embedded if it considers itself compatible
>>> >     with the
>>> >      >> >>>>>>> dependency list.
>>> >      >> >>>>>>
>>> >      >> >>>>>> Could you explain how that would work in practice?
>>> >      >> >>>>>
>>> >      >> >>>>> Say one has a pipeline with environments
>>> >      >> >>>>>
>>> >      >> >>>>> A: beam-java-sdk-2.12-docker
>>> >      >> >>>>> B: beam-java-sdk-2.12-docker + dep1
>>> >      >> >>>>> C: beam-java-sdk-2.12-docker + dep2
>>> >      >> >>>>> D: beam-java-sdk-2.12-docker + dep3
>>> >      >> >>>>>
>>> >      >> >>>>> A runner could (conceivably) be intelligent enough to
>>> know
>>> >     that dep1
>>> >      >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
>>> >     single
>>> >      >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment
>>> (with the
>>> >      >> >>>>> corresponding fusion and lower overhead benefits). If a
>>> >     certain
>>> >      >> >>>>> pipeline option is set, it might further note that dep1
>>> >     and dep2 are
>>> >      >> >>>>> compatible with its own workers, which are build against
>>> >     sdk-2.12, and
>>> >      >> >>>>> choose to run these in embedded + dep1 + dep2
>>> environment.
>>> >      >> >>>>
>>> >      >> >>>> We have been talking about the expansion service and cross
>>> >     language transforms a lot lately but I believe it will initially
>>> >     come at the cost of poor fusion of transforms since "merging"
>>> >     environments that are compatible is a difficult problem since it
>>> >     brings up many of the dependency management issues (e.g. diamond
>>> >     dependency issues).
>>> >      >> >>>
>>> >      >> >>> I agree. I think expansion services offering "kitchen-sink"
>>> >      >> >>> containers, when possible, can go far here. If we could at
>>> least
>>> >      >> >>> recognize when one environment/set of deps is a superset of
>>> >     another,
>>> >      >> >>> that could be an easy case that would yield a lot of
>>> benefit
>>> >     as well.
>>> >      >> >>
>>> >      >> >>
>>> >      >> >> +1
>>> >
>>>
>>

Re: Artifact staging in cross-language pipelines

Posted by Heejong Lee <he...@google.com>.
Hi,

I wrote the draft on implementation plan[1]. The summary is on the first
page. Any help would be appreciated!

[1]:
https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing

On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <he...@google.com> wrote:

> I'm brushing up memory by revisiting the doc[1] and it seems like we've
> already reached the consensus on the bigger picture. I would start drafting
> the implementation plan.
>
> [1]:
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>
> On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Hey Heejong,
>>
>> I don't think so. It would be great to push this forward.
>>
>> Thanks,
>> Max
>>
>> On 26.11.19 02:49, Heejong Lee wrote:
>> > Hi,
>> >
>> > Is anyone actively working on artifact staging extension for
>> > cross-language pipelines? I'm thinking I can contribute to it in coming
>> > Dec. If anyone has any progress on this and needs help, please let me
>> know.
>> >
>> > Thanks,
>> >
>> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     Can you please add this to the design documents webpage.
>> >     https://beam.apache.org/contribute/design-documents/
>> >
>> >     On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
>> >     <chamikara@google.com <ma...@google.com>> wrote:
>> >      >
>> >      >
>> >      >
>> >      > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
>> >     <mxm@apache.org <ma...@apache.org>> wrote:
>> >      >>
>> >      >> Here's the first draft:
>> >      >>
>> >
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>> >      >>
>> >      >> It's rather high-level. We may want to add more details once we
>> have
>> >      >> finalized the design. Feel free to make comments and edits.
>> >      >
>> >      >
>> >      > Thanks Max. Added some comments.
>> >      >
>> >      >>
>> >      >>
>> >      >> > All of this goes back to the idea that I think the listing of
>> >      >> > artifacts (or more general dependencies) should be a property
>> >     of the
>> >      >> > environment themselves.
>> >      >>
>> >      >> +1 I came to the same conclusion while thinking about how to
>> store
>> >      >> artifact information for deferred execution of the pipeline.
>> >      >>
>> >      >> -Max
>> >      >>
>> >      >> On 07.05.19 18:10, Robert Bradshaw wrote:
>> >      >> > Looking forward to your writeup, Max. In the meantime, some
>> >     comments below.
>> >      >> >
>> >      >> >
>> >      >> > From: Lukasz Cwik <lcwik@google.com <mailto:lcwik@google.com
>> >>
>> >      >> > Date: Thu, May 2, 2019 at 6:45 PM
>> >      >> > To: dev
>> >      >> >
>> >      >> >>
>> >      >> >>
>> >      >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
>> >     <robertwb@google.com <ma...@google.com>> wrote:
>> >      >> >>>
>> >      >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
>> >     <lcwik@google.com <ma...@google.com>> wrote:
>> >      >> >>>>
>> >      >> >>>> We should stick with URN + payload + artifact metadata[1]
>> >     where the only mandatory one that all SDKs and expansion services
>> >     understand is the "bytes" artifact type. This allows us to add
>> >     optional URNs for file://, http://, Maven, PyPi, ... in the future.
>> >     I would make the artifact staging service use the same URN + payload
>> >     mechanism to get compatibility of artifacts across the different
>> >     services and also have the artifact staging service be able to be
>> >     queried for the list of artifact types it supports.
>> >      >> >>>
>> >      >> >>> +1
>> >      >> >>>
>> >      >> >>>> Finally, we would need to have environments enumerate the
>> >     artifact types that they support.
>> >      >> >>>
>> >      >> >>> Meaning at runtime, or as another field statically set in
>> >     the proto?
>> >      >> >>
>> >      >> >>
>> >      >> >> I don't believe runners/SDKs should have to know what
>> >     artifacts each environment supports at runtime and instead have
>> >     environments enumerate them explicitly in the proto. I have been
>> >     thinking about a more general "capabilities" block on environments
>> >     which allow them to enumerate URNs that the environment understands.
>> >     This would include artifact type URNs, PTransform URNs, coder URNs,
>> >     ... I haven't proposed anything specific down this line yet because
>> >     I was wondering how environment resources (CPU, min memory, hardware
>> >     like GPU, AWS/GCP/Azure/... machine types) should/could tie into
>> this.
>> >      >> >>
>> >      >> >>>
>> >      >> >>>> Having everyone have the same "artifact" representation
>> >     would be beneficial since:
>> >      >> >>>> a) Python environments could install dependencies from a
>> >     requirements.txt file (something that the Google Cloud Dataflow
>> >     Python docker container allows for today)
>> >      >> >>>> b) It provides an extensible and versioned mechanism for
>> >     SDKs, environments, and artifact staging/retrieval services to
>> >     support additional artifact types
>> >      >> >>>> c) Allow for expressing a canonical representation of an
>> >     artifact like a Maven package so a runner could merge environments
>> >     that the runner deems compatible.
>> >      >> >>>>
>> >      >> >>>> The flow I could see is:
>> >      >> >>>> 1) (optional) query artifact staging service for supported
>> >     artifact types
>> >      >> >>>> 2) SDK request expansion service to expand transform
>> >     passing in a list of artifact types the SDK and artifact staging
>> >     service support, the expansion service returns a list of artifact
>> >     types limited to those supported types + any supported by the
>> >     environment
>> >      >> >>>
>> >      >> >>> The crux of the issue seems to be how the expansion service
>> >     returns
>> >      >> >>> the artifacts themselves. Is this going with the approach
>> >     that the
>> >      >> >>> caller of the expansion service must host an artifact
>> >     staging service?
>> >      >> >>
>> >      >> >>
>> >      >> >> The caller would not need to host an artifact staging service
>> >     (but would become effectively a proxy service, see my comment below
>> >     for more details) as I would have expected this to be part of the
>> >     expansion service response.
>> >      >> >>
>> >      >> >>>
>> >      >> >>> There is also the question here is how the returned
>> >     artifacts get
>> >      >> >>> attached to the various environments, or whether they get
>> >     implicitly
>> >      >> >>> applied to all returned stages (which need not have a
>> consistent
>> >      >> >>> environment)?
>> >      >> >>
>> >      >> >>
>> >      >> >> I would suggest returning additional information that says
>> >     what artifact is for which environment. Applying all artifacts to
>> >     all environments is likely to cause issues since some environments
>> >     may not understand certain artifact types or may get conflicting
>> >     versions of artifacts. I would see this happening since an expansion
>> >     service that aggregates other expansion services seems likely, for
>> >     example:
>> >      >> >>                               /-> ExpansionSerivce(Python)
>> >      >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>> >      >> >>                               \-> ExpansionSerivce(Go)
>> >      >> >
>> >      >> > All of this goes back to the idea that I think the listing of
>> >      >> > artifacts (or more general dependencies) should be a property
>> >     of the
>> >      >> > environment themselves.
>> >      >> >
>> >      >> >>>> 3) SDK converts any artifact types that the artifact
>> >     staging service or environment doesn't understand, e.g. pulls down
>> >     Maven dependencies and converts them to "bytes" artifacts
>> >      >> >>>
>> >      >> >>> Here I think we're conflating two things. The "type" of an
>> >     artifact is
>> >      >> >>> both (1) how to fetch the bytes and (2) how to interpret
>> >     them (e.g. is
>> >      >> >>> this a jar file, or a pip tarball, or just some data needed
>> >     by a DoFn,
>> >      >> >>> or ...) Only (1) can be freely transmuted.
>> >      >> >>
>> >      >> >>
>> >      >> >> Your right. Thinking about this some more, general artifact
>> >     conversion is unlikely to be practical because how to interpret an
>> >     artifact is environment dependent. For example, a requirements.txt
>> >     used to install pip packages for a Python docker container depends
>> >     on the filesystem layout of that specific docker container. One
>> >     could simulate doing a pip install on the same filesystem, see the
>> >     diff and then of all the packages in requirements.txt but this
>> >     quickly becomes impractical.
>> >      >> >>
>> >      >> >>>
>> >      >> >>>> 4) SDK sends artifacts to artifact staging service
>> >      >> >>>> 5) Artifact staging service converts any artifacts to types
>> >     that the environment understands
>> >      >> >>>> 6) Environment is started and gets artifacts from the
>> >     artifact retrieval service.
>> >      >> >>>>
>> >      >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
>> >     <robertwb@google.com <ma...@google.com>> wrote:
>> >      >> >>>>>
>> >      >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
>> >     <mxm@apache.org <ma...@apache.org>> wrote:
>> >      >> >>>>>>
>> >      >> >>>>>> Good idea to let the client expose an artifact staging
>> >     service that the
>> >      >> >>>>>> ExpansionService could use to stage artifacts. This
>> >     solves two problems:
>> >      >> >>>>>>
>> >      >> >>>>>> (1) The Expansion Service not being able to access the
>> >     Job Server
>> >      >> >>>>>> artifact staging service
>> >      >> >>>>>> (2) The client not having access to the dependencies
>> >     returned by the
>> >      >> >>>>>> Expansion Server
>> >      >> >>>>>>
>> >      >> >>>>>> The downside is that it adds an additional indirection.
>> >     The alternative
>> >      >> >>>>>> to let the client handle staging the artifacts returned
>> >     by the Expansion
>> >      >> >>>>>> Server is more transparent and easier to implement.
>> >      >> >>>>>
>> >      >> >>>>> The other downside is that it may not always be possible
>> >     for the
>> >      >> >>>>> expansion service to connect to the artifact staging
>> >     service (e.g.
>> >      >> >>>>> when constructing a pipeline locally against a remote
>> >     expansion
>> >      >> >>>>> service).
>> >      >> >>>>
>> >      >> >>>> Just to make sure, your saying the expansion service would
>> >     return all the artifacts (bytes, urls, ...) as part of the response
>> >     since the expansion service wouldn't be able to connect to the SDK
>> >     that is running locally either.
>> >      >> >>>
>> >      >> >>> Yes. Well, more I'm asking how the expansion service would
>> >     return any
>> >      >> >>> artifacts.
>> >      >> >>>
>> >      >> >>> What we have is
>> >      >> >>>
>> >      >> >>> Runner <--- SDK ---> Expansion service.
>> >      >> >>>
>> >      >> >>> Where the unidirectional arrow means "instantiates a
>> >     connection with"
>> >      >> >>> and the other direction (and missing arrows) may not be
>> >     possible.
>> >      >> >>
>> >      >> >>
>> >      >> >> I believe the ExpansionService Expand request should become a
>> >     unidirectional stream back to the caller so that artifacts could be
>> >     sent back to the SDK (effectively mirroring the artifact staging
>> >     service API). So the expansion response would stream back a bunch
>> >     artifact data messages and also the expansion response containing
>> >     PTransform information.
>> >      >> >
>> >      >> > +1.
>> >      >> >
>> >      >> >>>>>> Ideally, the Expansion Service won't return any
>> >     dependencies because the
>> >      >> >>>>>> environment already contains the required dependencies.
>> >     We could make it
>> >      >> >>>>>> a requirement for the expansion to be performed inside an
>> >     environment.
>> >      >> >>>>>> Then we would already ensure during expansion time that
>> >     the runtime
>> >      >> >>>>>> dependencies are available.
>> >      >> >>>>>
>> >      >> >>>>> Yes, it's cleanest if the expansion service provides an
>> >     environment
>> >      >> >>>>> without all the dependencies provided. Interesting idea to
>> >     make this a
>> >      >> >>>>> property of the expansion service itself.
>> >      >> >>>>
>> >      >> >>>> I had thought this too but an opaque docker container that
>> >     was built on top of a base Beam docker container would be very
>> >     difficult for a runner to introspect and check to see if its
>> >     compatible to allow for fusion across PTransforms. I think artifacts
>> >     need to be communicated in their canonical representation.
>> >      >> >>>
>> >      >> >>> It's clean (from the specification point of view), but
>> >     doesn't allow
>> >      >> >>> for good introspection/fusion (aside from one being a base
>> >     of another,
>> >      >> >>> perhaps).
>> >      >> >>>
>> >      >> >>>>>>> In this case, the runner would (as
>> >      >> >>>>>>> requested by its configuration) be free to merge
>> >     environments it
>> >      >> >>>>>>> deemed compatible, including swapping out beam-java-X
>> for
>> >      >> >>>>>>> beam-java-embedded if it considers itself compatible
>> >     with the
>> >      >> >>>>>>> dependency list.
>> >      >> >>>>>>
>> >      >> >>>>>> Could you explain how that would work in practice?
>> >      >> >>>>>
>> >      >> >>>>> Say one has a pipeline with environments
>> >      >> >>>>>
>> >      >> >>>>> A: beam-java-sdk-2.12-docker
>> >      >> >>>>> B: beam-java-sdk-2.12-docker + dep1
>> >      >> >>>>> C: beam-java-sdk-2.12-docker + dep2
>> >      >> >>>>> D: beam-java-sdk-2.12-docker + dep3
>> >      >> >>>>>
>> >      >> >>>>> A runner could (conceivably) be intelligent enough to know
>> >     that dep1
>> >      >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
>> >     single
>> >      >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with
>> the
>> >      >> >>>>> corresponding fusion and lower overhead benefits). If a
>> >     certain
>> >      >> >>>>> pipeline option is set, it might further note that dep1
>> >     and dep2 are
>> >      >> >>>>> compatible with its own workers, which are build against
>> >     sdk-2.12, and
>> >      >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
>> >      >> >>>>
>> >      >> >>>> We have been talking about the expansion service and cross
>> >     language transforms a lot lately but I believe it will initially
>> >     come at the cost of poor fusion of transforms since "merging"
>> >     environments that are compatible is a difficult problem since it
>> >     brings up many of the dependency management issues (e.g. diamond
>> >     dependency issues).
>> >      >> >>>
>> >      >> >>> I agree. I think expansion services offering "kitchen-sink"
>> >      >> >>> containers, when possible, can go far here. If we could at
>> least
>> >      >> >>> recognize when one environment/set of deps is a superset of
>> >     another,
>> >      >> >>> that could be an easy case that would yield a lot of benefit
>> >     as well.
>> >      >> >>
>> >      >> >>
>> >      >> >> +1
>> >
>>
>

Re: Artifact staging in cross-language pipelines

Posted by Heejong Lee <he...@google.com>.
I'm brushing up memory by revisiting the doc[1] and it seems like we've
already reached the consensus on the bigger picture. I would start drafting
the implementation plan.

[1]:
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing

On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org> wrote:

> Hey Heejong,
>
> I don't think so. It would be great to push this forward.
>
> Thanks,
> Max
>
> On 26.11.19 02:49, Heejong Lee wrote:
> > Hi,
> >
> > Is anyone actively working on artifact staging extension for
> > cross-language pipelines? I'm thinking I can contribute to it in coming
> > Dec. If anyone has any progress on this and needs help, please let me
> know.
> >
> > Thanks,
> >
> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Can you please add this to the design documents webpage.
> >     https://beam.apache.org/contribute/design-documents/
> >
> >     On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
> >     <chamikara@google.com <ma...@google.com>> wrote:
> >      >
> >      >
> >      >
> >      > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>> wrote:
> >      >>
> >      >> Here's the first draft:
> >      >>
> >
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> >      >>
> >      >> It's rather high-level. We may want to add more details once we
> have
> >      >> finalized the design. Feel free to make comments and edits.
> >      >
> >      >
> >      > Thanks Max. Added some comments.
> >      >
> >      >>
> >      >>
> >      >> > All of this goes back to the idea that I think the listing of
> >      >> > artifacts (or more general dependencies) should be a property
> >     of the
> >      >> > environment themselves.
> >      >>
> >      >> +1 I came to the same conclusion while thinking about how to
> store
> >      >> artifact information for deferred execution of the pipeline.
> >      >>
> >      >> -Max
> >      >>
> >      >> On 07.05.19 18:10, Robert Bradshaw wrote:
> >      >> > Looking forward to your writeup, Max. In the meantime, some
> >     comments below.
> >      >> >
> >      >> >
> >      >> > From: Lukasz Cwik <lcwik@google.com <ma...@google.com>>
> >      >> > Date: Thu, May 2, 2019 at 6:45 PM
> >      >> > To: dev
> >      >> >
> >      >> >>
> >      >> >>
> >      >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
> >     <robertwb@google.com <ma...@google.com>> wrote:
> >      >> >>>
> >      >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
> >     <lcwik@google.com <ma...@google.com>> wrote:
> >      >> >>>>
> >      >> >>>> We should stick with URN + payload + artifact metadata[1]
> >     where the only mandatory one that all SDKs and expansion services
> >     understand is the "bytes" artifact type. This allows us to add
> >     optional URNs for file://, http://, Maven, PyPi, ... in the future.
> >     I would make the artifact staging service use the same URN + payload
> >     mechanism to get compatibility of artifacts across the different
> >     services and also have the artifact staging service be able to be
> >     queried for the list of artifact types it supports.
> >      >> >>>
> >      >> >>> +1
> >      >> >>>
> >      >> >>>> Finally, we would need to have environments enumerate the
> >     artifact types that they support.
> >      >> >>>
> >      >> >>> Meaning at runtime, or as another field statically set in
> >     the proto?
> >      >> >>
> >      >> >>
> >      >> >> I don't believe runners/SDKs should have to know what
> >     artifacts each environment supports at runtime and instead have
> >     environments enumerate them explicitly in the proto. I have been
> >     thinking about a more general "capabilities" block on environments
> >     which allow them to enumerate URNs that the environment understands.
> >     This would include artifact type URNs, PTransform URNs, coder URNs,
> >     ... I haven't proposed anything specific down this line yet because
> >     I was wondering how environment resources (CPU, min memory, hardware
> >     like GPU, AWS/GCP/Azure/... machine types) should/could tie into
> this.
> >      >> >>
> >      >> >>>
> >      >> >>>> Having everyone have the same "artifact" representation
> >     would be beneficial since:
> >      >> >>>> a) Python environments could install dependencies from a
> >     requirements.txt file (something that the Google Cloud Dataflow
> >     Python docker container allows for today)
> >      >> >>>> b) It provides an extensible and versioned mechanism for
> >     SDKs, environments, and artifact staging/retrieval services to
> >     support additional artifact types
> >      >> >>>> c) Allow for expressing a canonical representation of an
> >     artifact like a Maven package so a runner could merge environments
> >     that the runner deems compatible.
> >      >> >>>>
> >      >> >>>> The flow I could see is:
> >      >> >>>> 1) (optional) query artifact staging service for supported
> >     artifact types
> >      >> >>>> 2) SDK request expansion service to expand transform
> >     passing in a list of artifact types the SDK and artifact staging
> >     service support, the expansion service returns a list of artifact
> >     types limited to those supported types + any supported by the
> >     environment
> >      >> >>>
> >      >> >>> The crux of the issue seems to be how the expansion service
> >     returns
> >      >> >>> the artifacts themselves. Is this going with the approach
> >     that the
> >      >> >>> caller of the expansion service must host an artifact
> >     staging service?
> >      >> >>
> >      >> >>
> >      >> >> The caller would not need to host an artifact staging service
> >     (but would become effectively a proxy service, see my comment below
> >     for more details) as I would have expected this to be part of the
> >     expansion service response.
> >      >> >>
> >      >> >>>
> >      >> >>> There is also the question here is how the returned
> >     artifacts get
> >      >> >>> attached to the various environments, or whether they get
> >     implicitly
> >      >> >>> applied to all returned stages (which need not have a
> consistent
> >      >> >>> environment)?
> >      >> >>
> >      >> >>
> >      >> >> I would suggest returning additional information that says
> >     what artifact is for which environment. Applying all artifacts to
> >     all environments is likely to cause issues since some environments
> >     may not understand certain artifact types or may get conflicting
> >     versions of artifacts. I would see this happening since an expansion
> >     service that aggregates other expansion services seems likely, for
> >     example:
> >      >> >>                               /-> ExpansionSerivce(Python)
> >      >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >      >> >>                               \-> ExpansionSerivce(Go)
> >      >> >
> >      >> > All of this goes back to the idea that I think the listing of
> >      >> > artifacts (or more general dependencies) should be a property
> >     of the
> >      >> > environment themselves.
> >      >> >
> >      >> >>>> 3) SDK converts any artifact types that the artifact
> >     staging service or environment doesn't understand, e.g. pulls down
> >     Maven dependencies and converts them to "bytes" artifacts
> >      >> >>>
> >      >> >>> Here I think we're conflating two things. The "type" of an
> >     artifact is
> >      >> >>> both (1) how to fetch the bytes and (2) how to interpret
> >     them (e.g. is
> >      >> >>> this a jar file, or a pip tarball, or just some data needed
> >     by a DoFn,
> >      >> >>> or ...) Only (1) can be freely transmuted.
> >      >> >>
> >      >> >>
> >      >> >> Your right. Thinking about this some more, general artifact
> >     conversion is unlikely to be practical because how to interpret an
> >     artifact is environment dependent. For example, a requirements.txt
> >     used to install pip packages for a Python docker container depends
> >     on the filesystem layout of that specific docker container. One
> >     could simulate doing a pip install on the same filesystem, see the
> >     diff and then of all the packages in requirements.txt but this
> >     quickly becomes impractical.
> >      >> >>
> >      >> >>>
> >      >> >>>> 4) SDK sends artifacts to artifact staging service
> >      >> >>>> 5) Artifact staging service converts any artifacts to types
> >     that the environment understands
> >      >> >>>> 6) Environment is started and gets artifacts from the
> >     artifact retrieval service.
> >      >> >>>>
> >      >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
> >     <robertwb@google.com <ma...@google.com>> wrote:
> >      >> >>>>>
> >      >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>> wrote:
> >      >> >>>>>>
> >      >> >>>>>> Good idea to let the client expose an artifact staging
> >     service that the
> >      >> >>>>>> ExpansionService could use to stage artifacts. This
> >     solves two problems:
> >      >> >>>>>>
> >      >> >>>>>> (1) The Expansion Service not being able to access the
> >     Job Server
> >      >> >>>>>> artifact staging service
> >      >> >>>>>> (2) The client not having access to the dependencies
> >     returned by the
> >      >> >>>>>> Expansion Server
> >      >> >>>>>>
> >      >> >>>>>> The downside is that it adds an additional indirection.
> >     The alternative
> >      >> >>>>>> to let the client handle staging the artifacts returned
> >     by the Expansion
> >      >> >>>>>> Server is more transparent and easier to implement.
> >      >> >>>>>
> >      >> >>>>> The other downside is that it may not always be possible
> >     for the
> >      >> >>>>> expansion service to connect to the artifact staging
> >     service (e.g.
> >      >> >>>>> when constructing a pipeline locally against a remote
> >     expansion
> >      >> >>>>> service).
> >      >> >>>>
> >      >> >>>> Just to make sure, your saying the expansion service would
> >     return all the artifacts (bytes, urls, ...) as part of the response
> >     since the expansion service wouldn't be able to connect to the SDK
> >     that is running locally either.
> >      >> >>>
> >      >> >>> Yes. Well, more I'm asking how the expansion service would
> >     return any
> >      >> >>> artifacts.
> >      >> >>>
> >      >> >>> What we have is
> >      >> >>>
> >      >> >>> Runner <--- SDK ---> Expansion service.
> >      >> >>>
> >      >> >>> Where the unidirectional arrow means "instantiates a
> >     connection with"
> >      >> >>> and the other direction (and missing arrows) may not be
> >     possible.
> >      >> >>
> >      >> >>
> >      >> >> I believe the ExpansionService Expand request should become a
> >     unidirectional stream back to the caller so that artifacts could be
> >     sent back to the SDK (effectively mirroring the artifact staging
> >     service API). So the expansion response would stream back a bunch
> >     artifact data messages and also the expansion response containing
> >     PTransform information.
> >      >> >
> >      >> > +1.
> >      >> >
> >      >> >>>>>> Ideally, the Expansion Service won't return any
> >     dependencies because the
> >      >> >>>>>> environment already contains the required dependencies.
> >     We could make it
> >      >> >>>>>> a requirement for the expansion to be performed inside an
> >     environment.
> >      >> >>>>>> Then we would already ensure during expansion time that
> >     the runtime
> >      >> >>>>>> dependencies are available.
> >      >> >>>>>
> >      >> >>>>> Yes, it's cleanest if the expansion service provides an
> >     environment
> >      >> >>>>> without all the dependencies provided. Interesting idea to
> >     make this a
> >      >> >>>>> property of the expansion service itself.
> >      >> >>>>
> >      >> >>>> I had thought this too but an opaque docker container that
> >     was built on top of a base Beam docker container would be very
> >     difficult for a runner to introspect and check to see if its
> >     compatible to allow for fusion across PTransforms. I think artifacts
> >     need to be communicated in their canonical representation.
> >      >> >>>
> >      >> >>> It's clean (from the specification point of view), but
> >     doesn't allow
> >      >> >>> for good introspection/fusion (aside from one being a base
> >     of another,
> >      >> >>> perhaps).
> >      >> >>>
> >      >> >>>>>>> In this case, the runner would (as
> >      >> >>>>>>> requested by its configuration) be free to merge
> >     environments it
> >      >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> >      >> >>>>>>> beam-java-embedded if it considers itself compatible
> >     with the
> >      >> >>>>>>> dependency list.
> >      >> >>>>>>
> >      >> >>>>>> Could you explain how that would work in practice?
> >      >> >>>>>
> >      >> >>>>> Say one has a pipeline with environments
> >      >> >>>>>
> >      >> >>>>> A: beam-java-sdk-2.12-docker
> >      >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> >      >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> >      >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> >      >> >>>>>
> >      >> >>>>> A runner could (conceivably) be intelligent enough to know
> >     that dep1
> >      >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
> >     single
> >      >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with
> the
> >      >> >>>>> corresponding fusion and lower overhead benefits). If a
> >     certain
> >      >> >>>>> pipeline option is set, it might further note that dep1
> >     and dep2 are
> >      >> >>>>> compatible with its own workers, which are build against
> >     sdk-2.12, and
> >      >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> >      >> >>>>
> >      >> >>>> We have been talking about the expansion service and cross
> >     language transforms a lot lately but I believe it will initially
> >     come at the cost of poor fusion of transforms since "merging"
> >     environments that are compatible is a difficult problem since it
> >     brings up many of the dependency management issues (e.g. diamond
> >     dependency issues).
> >      >> >>>
> >      >> >>> I agree. I think expansion services offering "kitchen-sink"
> >      >> >>> containers, when possible, can go far here. If we could at
> least
> >      >> >>> recognize when one environment/set of deps is a superset of
> >     another,
> >      >> >>> that could be an easy case that would yield a lot of benefit
> >     as well.
> >      >> >>
> >      >> >>
> >      >> >> +1
> >
>

Re: Artifact staging in cross-language pipelines

Posted by Maximilian Michels <mx...@apache.org>.
Hey Heejong,

I don't think so. It would be great to push this forward.

Thanks,
Max

On 26.11.19 02:49, Heejong Lee wrote:
> Hi,
> 
> Is anyone actively working on artifact staging extension for 
> cross-language pipelines? I'm thinking I can contribute to it in coming 
> Dec. If anyone has any progress on this and needs help, please let me know.
> 
> Thanks,
> 
> On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Can you please add this to the design documents webpage.
>     https://beam.apache.org/contribute/design-documents/
> 
>     On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
>     <chamikara@google.com <ma...@google.com>> wrote:
>      >
>      >
>      >
>      > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>> wrote:
>      >>
>      >> Here's the first draft:
>      >>
>     https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>      >>
>      >> It's rather high-level. We may want to add more details once we have
>      >> finalized the design. Feel free to make comments and edits.
>      >
>      >
>      > Thanks Max. Added some comments.
>      >
>      >>
>      >>
>      >> > All of this goes back to the idea that I think the listing of
>      >> > artifacts (or more general dependencies) should be a property
>     of the
>      >> > environment themselves.
>      >>
>      >> +1 I came to the same conclusion while thinking about how to store
>      >> artifact information for deferred execution of the pipeline.
>      >>
>      >> -Max
>      >>
>      >> On 07.05.19 18:10, Robert Bradshaw wrote:
>      >> > Looking forward to your writeup, Max. In the meantime, some
>     comments below.
>      >> >
>      >> >
>      >> > From: Lukasz Cwik <lcwik@google.com <ma...@google.com>>
>      >> > Date: Thu, May 2, 2019 at 6:45 PM
>      >> > To: dev
>      >> >
>      >> >>
>      >> >>
>      >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
>     <robertwb@google.com <ma...@google.com>> wrote:
>      >> >>>
>      >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
>     <lcwik@google.com <ma...@google.com>> wrote:
>      >> >>>>
>      >> >>>> We should stick with URN + payload + artifact metadata[1]
>     where the only mandatory one that all SDKs and expansion services
>     understand is the "bytes" artifact type. This allows us to add
>     optional URNs for file://, http://, Maven, PyPi, ... in the future.
>     I would make the artifact staging service use the same URN + payload
>     mechanism to get compatibility of artifacts across the different
>     services and also have the artifact staging service be able to be
>     queried for the list of artifact types it supports.
>      >> >>>
>      >> >>> +1
>      >> >>>
>      >> >>>> Finally, we would need to have environments enumerate the
>     artifact types that they support.
>      >> >>>
>      >> >>> Meaning at runtime, or as another field statically set in
>     the proto?
>      >> >>
>      >> >>
>      >> >> I don't believe runners/SDKs should have to know what
>     artifacts each environment supports at runtime and instead have
>     environments enumerate them explicitly in the proto. I have been
>     thinking about a more general "capabilities" block on environments
>     which allow them to enumerate URNs that the environment understands.
>     This would include artifact type URNs, PTransform URNs, coder URNs,
>     ... I haven't proposed anything specific down this line yet because
>     I was wondering how environment resources (CPU, min memory, hardware
>     like GPU, AWS/GCP/Azure/... machine types) should/could tie into this.
>      >> >>
>      >> >>>
>      >> >>>> Having everyone have the same "artifact" representation
>     would be beneficial since:
>      >> >>>> a) Python environments could install dependencies from a
>     requirements.txt file (something that the Google Cloud Dataflow
>     Python docker container allows for today)
>      >> >>>> b) It provides an extensible and versioned mechanism for
>     SDKs, environments, and artifact staging/retrieval services to
>     support additional artifact types
>      >> >>>> c) Allow for expressing a canonical representation of an
>     artifact like a Maven package so a runner could merge environments
>     that the runner deems compatible.
>      >> >>>>
>      >> >>>> The flow I could see is:
>      >> >>>> 1) (optional) query artifact staging service for supported
>     artifact types
>      >> >>>> 2) SDK request expansion service to expand transform
>     passing in a list of artifact types the SDK and artifact staging
>     service support, the expansion service returns a list of artifact
>     types limited to those supported types + any supported by the
>     environment
>      >> >>>
>      >> >>> The crux of the issue seems to be how the expansion service
>     returns
>      >> >>> the artifacts themselves. Is this going with the approach
>     that the
>      >> >>> caller of the expansion service must host an artifact
>     staging service?
>      >> >>
>      >> >>
>      >> >> The caller would not need to host an artifact staging service
>     (but would become effectively a proxy service, see my comment below
>     for more details) as I would have expected this to be part of the
>     expansion service response.
>      >> >>
>      >> >>>
>      >> >>> There is also the question here is how the returned
>     artifacts get
>      >> >>> attached to the various environments, or whether they get
>     implicitly
>      >> >>> applied to all returned stages (which need not have a consistent
>      >> >>> environment)?
>      >> >>
>      >> >>
>      >> >> I would suggest returning additional information that says
>     what artifact is for which environment. Applying all artifacts to
>     all environments is likely to cause issues since some environments
>     may not understand certain artifact types or may get conflicting
>     versions of artifacts. I would see this happening since an expansion
>     service that aggregates other expansion services seems likely, for
>     example:
>      >> >>                               /-> ExpansionSerivce(Python)
>      >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>      >> >>                               \-> ExpansionSerivce(Go)
>      >> >
>      >> > All of this goes back to the idea that I think the listing of
>      >> > artifacts (or more general dependencies) should be a property
>     of the
>      >> > environment themselves.
>      >> >
>      >> >>>> 3) SDK converts any artifact types that the artifact
>     staging service or environment doesn't understand, e.g. pulls down
>     Maven dependencies and converts them to "bytes" artifacts
>      >> >>>
>      >> >>> Here I think we're conflating two things. The "type" of an
>     artifact is
>      >> >>> both (1) how to fetch the bytes and (2) how to interpret
>     them (e.g. is
>      >> >>> this a jar file, or a pip tarball, or just some data needed
>     by a DoFn,
>      >> >>> or ...) Only (1) can be freely transmuted.
>      >> >>
>      >> >>
>      >> >> Your right. Thinking about this some more, general artifact
>     conversion is unlikely to be practical because how to interpret an
>     artifact is environment dependent. For example, a requirements.txt
>     used to install pip packages for a Python docker container depends
>     on the filesystem layout of that specific docker container. One
>     could simulate doing a pip install on the same filesystem, see the
>     diff and then of all the packages in requirements.txt but this
>     quickly becomes impractical.
>      >> >>
>      >> >>>
>      >> >>>> 4) SDK sends artifacts to artifact staging service
>      >> >>>> 5) Artifact staging service converts any artifacts to types
>     that the environment understands
>      >> >>>> 6) Environment is started and gets artifacts from the
>     artifact retrieval service.
>      >> >>>>
>      >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
>     <robertwb@google.com <ma...@google.com>> wrote:
>      >> >>>>>
>      >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>> wrote:
>      >> >>>>>>
>      >> >>>>>> Good idea to let the client expose an artifact staging
>     service that the
>      >> >>>>>> ExpansionService could use to stage artifacts. This
>     solves two problems:
>      >> >>>>>>
>      >> >>>>>> (1) The Expansion Service not being able to access the
>     Job Server
>      >> >>>>>> artifact staging service
>      >> >>>>>> (2) The client not having access to the dependencies
>     returned by the
>      >> >>>>>> Expansion Server
>      >> >>>>>>
>      >> >>>>>> The downside is that it adds an additional indirection.
>     The alternative
>      >> >>>>>> to let the client handle staging the artifacts returned
>     by the Expansion
>      >> >>>>>> Server is more transparent and easier to implement.
>      >> >>>>>
>      >> >>>>> The other downside is that it may not always be possible
>     for the
>      >> >>>>> expansion service to connect to the artifact staging
>     service (e.g.
>      >> >>>>> when constructing a pipeline locally against a remote
>     expansion
>      >> >>>>> service).
>      >> >>>>
>      >> >>>> Just to make sure, your saying the expansion service would
>     return all the artifacts (bytes, urls, ...) as part of the response
>     since the expansion service wouldn't be able to connect to the SDK
>     that is running locally either.
>      >> >>>
>      >> >>> Yes. Well, more I'm asking how the expansion service would
>     return any
>      >> >>> artifacts.
>      >> >>>
>      >> >>> What we have is
>      >> >>>
>      >> >>> Runner <--- SDK ---> Expansion service.
>      >> >>>
>      >> >>> Where the unidirectional arrow means "instantiates a
>     connection with"
>      >> >>> and the other direction (and missing arrows) may not be
>     possible.
>      >> >>
>      >> >>
>      >> >> I believe the ExpansionService Expand request should become a
>     unidirectional stream back to the caller so that artifacts could be
>     sent back to the SDK (effectively mirroring the artifact staging
>     service API). So the expansion response would stream back a bunch
>     artifact data messages and also the expansion response containing
>     PTransform information.
>      >> >
>      >> > +1.
>      >> >
>      >> >>>>>> Ideally, the Expansion Service won't return any
>     dependencies because the
>      >> >>>>>> environment already contains the required dependencies.
>     We could make it
>      >> >>>>>> a requirement for the expansion to be performed inside an
>     environment.
>      >> >>>>>> Then we would already ensure during expansion time that
>     the runtime
>      >> >>>>>> dependencies are available.
>      >> >>>>>
>      >> >>>>> Yes, it's cleanest if the expansion service provides an
>     environment
>      >> >>>>> without all the dependencies provided. Interesting idea to
>     make this a
>      >> >>>>> property of the expansion service itself.
>      >> >>>>
>      >> >>>> I had thought this too but an opaque docker container that
>     was built on top of a base Beam docker container would be very
>     difficult for a runner to introspect and check to see if its
>     compatible to allow for fusion across PTransforms. I think artifacts
>     need to be communicated in their canonical representation.
>      >> >>>
>      >> >>> It's clean (from the specification point of view), but
>     doesn't allow
>      >> >>> for good introspection/fusion (aside from one being a base
>     of another,
>      >> >>> perhaps).
>      >> >>>
>      >> >>>>>>> In this case, the runner would (as
>      >> >>>>>>> requested by its configuration) be free to merge
>     environments it
>      >> >>>>>>> deemed compatible, including swapping out beam-java-X for
>      >> >>>>>>> beam-java-embedded if it considers itself compatible
>     with the
>      >> >>>>>>> dependency list.
>      >> >>>>>>
>      >> >>>>>> Could you explain how that would work in practice?
>      >> >>>>>
>      >> >>>>> Say one has a pipeline with environments
>      >> >>>>>
>      >> >>>>> A: beam-java-sdk-2.12-docker
>      >> >>>>> B: beam-java-sdk-2.12-docker + dep1
>      >> >>>>> C: beam-java-sdk-2.12-docker + dep2
>      >> >>>>> D: beam-java-sdk-2.12-docker + dep3
>      >> >>>>>
>      >> >>>>> A runner could (conceivably) be intelligent enough to know
>     that dep1
>      >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
>     single
>      >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
>      >> >>>>> corresponding fusion and lower overhead benefits). If a
>     certain
>      >> >>>>> pipeline option is set, it might further note that dep1
>     and dep2 are
>      >> >>>>> compatible with its own workers, which are build against
>     sdk-2.12, and
>      >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
>      >> >>>>
>      >> >>>> We have been talking about the expansion service and cross
>     language transforms a lot lately but I believe it will initially
>     come at the cost of poor fusion of transforms since "merging"
>     environments that are compatible is a difficult problem since it
>     brings up many of the dependency management issues (e.g. diamond
>     dependency issues).
>      >> >>>
>      >> >>> I agree. I think expansion services offering "kitchen-sink"
>      >> >>> containers, when possible, can go far here. If we could at least
>      >> >>> recognize when one environment/set of deps is a superset of
>     another,
>      >> >>> that could be an easy case that would yield a lot of benefit
>     as well.
>      >> >>
>      >> >>
>      >> >> +1
>