You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Heejong Lee <he...@google.com> on 2019/11/26 01:49:44 UTC
Re: Artifact staging in cross-language pipelines
Hi,
Is anyone actively working on artifact staging extension for cross-language
pipelines? I'm thinking I can contribute to it in coming Dec. If anyone has
any progress on this and needs help, please let me know.
Thanks,
On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <ie...@gmail.com> wrote:
> Can you please add this to the design documents webpage.
> https://beam.apache.org/contribute/design-documents/
>
> On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <ch...@google.com>
> wrote:
> >
> >
> >
> > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >>
> >> Here's the first draft:
> >>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> >>
> >> It's rather high-level. We may want to add more details once we have
> >> finalized the design. Feel free to make comments and edits.
> >
> >
> > Thanks Max. Added some comments.
> >
> >>
> >>
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >>
> >> +1 I came to the same conclusion while thinking about how to store
> >> artifact information for deferred execution of the pipeline.
> >>
> >> -Max
> >>
> >> On 07.05.19 18:10, Robert Bradshaw wrote:
> >> > Looking forward to your writeup, Max. In the meantime, some comments
> below.
> >> >
> >> >
> >> > From: Lukasz Cwik <lc...@google.com>
> >> > Date: Thu, May 2, 2019 at 6:45 PM
> >> > To: dev
> >> >
> >> >>
> >> >>
> >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>>
> >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>
> >> >>>> We should stick with URN + payload + artifact metadata[1] where
> the only mandatory one that all SDKs and expansion services understand is
> the "bytes" artifact type. This allows us to add optional URNs for file://,
> http://, Maven, PyPi, ... in the future. I would make the artifact
> staging service use the same URN + payload mechanism to get compatibility
> of artifacts across the different services and also have the artifact
> staging service be able to be queried for the list of artifact types it
> supports.
> >> >>>
> >> >>> +1
> >> >>>
> >> >>>> Finally, we would need to have environments enumerate the artifact
> types that they support.
> >> >>>
> >> >>> Meaning at runtime, or as another field statically set in the proto?
> >> >>
> >> >>
> >> >> I don't believe runners/SDKs should have to know what artifacts each
> environment supports at runtime and instead have environments enumerate
> them explicitly in the proto. I have been thinking about a more general
> "capabilities" block on environments which allow them to enumerate URNs
> that the environment understands. This would include artifact type URNs,
> PTransform URNs, coder URNs, ... I haven't proposed anything specific down
> this line yet because I was wondering how environment resources (CPU, min
> memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could
> tie into this.
> >> >>
> >> >>>
> >> >>>> Having everyone have the same "artifact" representation would be
> beneficial since:
> >> >>>> a) Python environments could install dependencies from a
> requirements.txt file (something that the Google Cloud Dataflow Python
> docker container allows for today)
> >> >>>> b) It provides an extensible and versioned mechanism for SDKs,
> environments, and artifact staging/retrieval services to support additional
> artifact types
> >> >>>> c) Allow for expressing a canonical representation of an artifact
> like a Maven package so a runner could merge environments that the runner
> deems compatible.
> >> >>>>
> >> >>>> The flow I could see is:
> >> >>>> 1) (optional) query artifact staging service for supported
> artifact types
> >> >>>> 2) SDK request expansion service to expand transform passing in a
> list of artifact types the SDK and artifact staging service support, the
> expansion service returns a list of artifact types limited to those
> supported types + any supported by the environment
> >> >>>
> >> >>> The crux of the issue seems to be how the expansion service returns
> >> >>> the artifacts themselves. Is this going with the approach that the
> >> >>> caller of the expansion service must host an artifact staging
> service?
> >> >>
> >> >>
> >> >> The caller would not need to host an artifact staging service (but
> would become effectively a proxy service, see my comment below for more
> details) as I would have expected this to be part of the expansion service
> response.
> >> >>
> >> >>>
> >> >>> There is also the question here is how the returned artifacts get
> >> >>> attached to the various environments, or whether they get implicitly
> >> >>> applied to all returned stages (which need not have a consistent
> >> >>> environment)?
> >> >>
> >> >>
> >> >> I would suggest returning additional information that says what
> artifact is for which environment. Applying all artifacts to all
> environments is likely to cause issues since some environments may not
> understand certain artifact types or may get conflicting versions of
> artifacts. I would see this happening since an expansion service that
> aggregates other expansion services seems likely, for example:
> >> >> /-> ExpansionSerivce(Python)
> >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >> >> \-> ExpansionSerivce(Go)
> >> >
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >> >
> >> >>>> 3) SDK converts any artifact types that the artifact staging
> service or environment doesn't understand, e.g. pulls down Maven
> dependencies and converts them to "bytes" artifacts
> >> >>>
> >> >>> Here I think we're conflating two things. The "type" of an artifact
> is
> >> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g.
> is
> >> >>> this a jar file, or a pip tarball, or just some data needed by a
> DoFn,
> >> >>> or ...) Only (1) can be freely transmuted.
> >> >>
> >> >>
> >> >> Your right. Thinking about this some more, general artifact
> conversion is unlikely to be practical because how to interpret an artifact
> is environment dependent. For example, a requirements.txt used to install
> pip packages for a Python docker container depends on the filesystem layout
> of that specific docker container. One could simulate doing a pip install
> on the same filesystem, see the diff and then of all the packages in
> requirements.txt but this quickly becomes impractical.
> >> >>
> >> >>>
> >> >>>> 4) SDK sends artifacts to artifact staging service
> >> >>>> 5) Artifact staging service converts any artifacts to types that
> the environment understands
> >> >>>> 6) Environment is started and gets artifacts from the artifact
> retrieval service.
> >> >>>>
> >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >>>>>
> >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <
> mxm@apache.org> wrote:
> >> >>>>>>
> >> >>>>>> Good idea to let the client expose an artifact staging service
> that the
> >> >>>>>> ExpansionService could use to stage artifacts. This solves two
> problems:
> >> >>>>>>
> >> >>>>>> (1) The Expansion Service not being able to access the Job Server
> >> >>>>>> artifact staging service
> >> >>>>>> (2) The client not having access to the dependencies returned by
> the
> >> >>>>>> Expansion Server
> >> >>>>>>
> >> >>>>>> The downside is that it adds an additional indirection. The
> alternative
> >> >>>>>> to let the client handle staging the artifacts returned by the
> Expansion
> >> >>>>>> Server is more transparent and easier to implement.
> >> >>>>>
> >> >>>>> The other downside is that it may not always be possible for the
> >> >>>>> expansion service to connect to the artifact staging service (e.g.
> >> >>>>> when constructing a pipeline locally against a remote expansion
> >> >>>>> service).
> >> >>>>
> >> >>>> Just to make sure, your saying the expansion service would return
> all the artifacts (bytes, urls, ...) as part of the response since the
> expansion service wouldn't be able to connect to the SDK that is running
> locally either.
> >> >>>
> >> >>> Yes. Well, more I'm asking how the expansion service would return
> any
> >> >>> artifacts.
> >> >>>
> >> >>> What we have is
> >> >>>
> >> >>> Runner <--- SDK ---> Expansion service.
> >> >>>
> >> >>> Where the unidirectional arrow means "instantiates a connection
> with"
> >> >>> and the other direction (and missing arrows) may not be possible.
> >> >>
> >> >>
> >> >> I believe the ExpansionService Expand request should become a
> unidirectional stream back to the caller so that artifacts could be sent
> back to the SDK (effectively mirroring the artifact staging service API).
> So the expansion response would stream back a bunch artifact data messages
> and also the expansion response containing PTransform information.
> >> >
> >> > +1.
> >> >
> >> >>>>>> Ideally, the Expansion Service won't return any dependencies
> because the
> >> >>>>>> environment already contains the required dependencies. We could
> make it
> >> >>>>>> a requirement for the expansion to be performed inside an
> environment.
> >> >>>>>> Then we would already ensure during expansion time that the
> runtime
> >> >>>>>> dependencies are available.
> >> >>>>>
> >> >>>>> Yes, it's cleanest if the expansion service provides an
> environment
> >> >>>>> without all the dependencies provided. Interesting idea to make
> this a
> >> >>>>> property of the expansion service itself.
> >> >>>>
> >> >>>> I had thought this too but an opaque docker container that was
> built on top of a base Beam docker container would be very difficult for a
> runner to introspect and check to see if its compatible to allow for fusion
> across PTransforms. I think artifacts need to be communicated in their
> canonical representation.
> >> >>>
> >> >>> It's clean (from the specification point of view), but doesn't allow
> >> >>> for good introspection/fusion (aside from one being a base of
> another,
> >> >>> perhaps).
> >> >>>
> >> >>>>>>> In this case, the runner would (as
> >> >>>>>>> requested by its configuration) be free to merge environments it
> >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> >> >>>>>>> beam-java-embedded if it considers itself compatible with the
> >> >>>>>>> dependency list.
> >> >>>>>>
> >> >>>>>> Could you explain how that would work in practice?
> >> >>>>>
> >> >>>>> Say one has a pipeline with environments
> >> >>>>>
> >> >>>>> A: beam-java-sdk-2.12-docker
> >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> >> >>>>>
> >> >>>>> A runner could (conceivably) be intelligent enough to know that
> dep1
> >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single
> >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
> >> >>>>> corresponding fusion and lower overhead benefits). If a certain
> >> >>>>> pipeline option is set, it might further note that dep1 and dep2
> are
> >> >>>>> compatible with its own workers, which are build against
> sdk-2.12, and
> >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> >> >>>>
> >> >>>> We have been talking about the expansion service and cross
> language transforms a lot lately but I believe it will initially come at
> the cost of poor fusion of transforms since "merging" environments that are
> compatible is a difficult problem since it brings up many of the dependency
> management issues (e.g. diamond dependency issues).
> >> >>>
> >> >>> I agree. I think expansion services offering "kitchen-sink"
> >> >>> containers, when possible, can go far here. If we could at least
> >> >>> recognize when one environment/set of deps is a superset of another,
> >> >>> that could be an easy case that would yield a lot of benefit as
> well.
> >> >>
> >> >>
> >> >> +1
>
Re: Artifact staging in cross-language pipelines
Posted by Robert Bradshaw <ro...@google.com>.
Thanks! I've left some comments on the doc.
On Tue, Dec 17, 2019, 5:03 PM Heejong Lee <he...@google.com> wrote:
> Hi,
>
> I wrote the draft on implementation plan[1]. The summary is on the first
> page. Any help would be appreciated!
>
> [1]:
> https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing
>
> On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <he...@google.com> wrote:
>
>> I'm brushing up memory by revisiting the doc[1] and it seems like we've
>> already reached the consensus on the bigger picture. I would start drafting
>> the implementation plan.
>>
>> [1]:
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>>
>> On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hey Heejong,
>>>
>>> I don't think so. It would be great to push this forward.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 26.11.19 02:49, Heejong Lee wrote:
>>> > Hi,
>>> >
>>> > Is anyone actively working on artifact staging extension for
>>> > cross-language pipelines? I'm thinking I can contribute to it in
>>> coming
>>> > Dec. If anyone has any progress on this and needs help, please let me
>>> know.
>>> >
>>> > Thanks,
>>> >
>>> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
>>> > <ma...@gmail.com>> wrote:
>>> >
>>> > Can you please add this to the design documents webpage.
>>> > https://beam.apache.org/contribute/design-documents/
>>> >
>>> > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
>>> > <chamikara@google.com <ma...@google.com>> wrote:
>>> > >
>>> > >
>>> > >
>>> > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>> > >>
>>> > >> Here's the first draft:
>>> > >>
>>> >
>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>>> > >>
>>> > >> It's rather high-level. We may want to add more details once
>>> we have
>>> > >> finalized the design. Feel free to make comments and edits.
>>> > >
>>> > >
>>> > > Thanks Max. Added some comments.
>>> > >
>>> > >>
>>> > >>
>>> > >> > All of this goes back to the idea that I think the listing of
>>> > >> > artifacts (or more general dependencies) should be a property
>>> > of the
>>> > >> > environment themselves.
>>> > >>
>>> > >> +1 I came to the same conclusion while thinking about how to
>>> store
>>> > >> artifact information for deferred execution of the pipeline.
>>> > >>
>>> > >> -Max
>>> > >>
>>> > >> On 07.05.19 18:10, Robert Bradshaw wrote:
>>> > >> > Looking forward to your writeup, Max. In the meantime, some
>>> > comments below.
>>> > >> >
>>> > >> >
>>> > >> > From: Lukasz Cwik <lcwik@google.com <mailto:lcwik@google.com
>>> >>
>>> > >> > Date: Thu, May 2, 2019 at 6:45 PM
>>> > >> > To: dev
>>> > >> >
>>> > >> >>
>>> > >> >>
>>> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
>>> > <robertwb@google.com <ma...@google.com>> wrote:
>>> > >> >>>
>>> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
>>> > <lcwik@google.com <ma...@google.com>> wrote:
>>> > >> >>>>
>>> > >> >>>> We should stick with URN + payload + artifact metadata[1]
>>> > where the only mandatory one that all SDKs and expansion services
>>> > understand is the "bytes" artifact type. This allows us to add
>>> > optional URNs for file://, http://, Maven, PyPi, ... in the
>>> future.
>>> > I would make the artifact staging service use the same URN +
>>> payload
>>> > mechanism to get compatibility of artifacts across the different
>>> > services and also have the artifact staging service be able to be
>>> > queried for the list of artifact types it supports.
>>> > >> >>>
>>> > >> >>> +1
>>> > >> >>>
>>> > >> >>>> Finally, we would need to have environments enumerate the
>>> > artifact types that they support.
>>> > >> >>>
>>> > >> >>> Meaning at runtime, or as another field statically set in
>>> > the proto?
>>> > >> >>
>>> > >> >>
>>> > >> >> I don't believe runners/SDKs should have to know what
>>> > artifacts each environment supports at runtime and instead have
>>> > environments enumerate them explicitly in the proto. I have been
>>> > thinking about a more general "capabilities" block on environments
>>> > which allow them to enumerate URNs that the environment
>>> understands.
>>> > This would include artifact type URNs, PTransform URNs, coder URNs,
>>> > ... I haven't proposed anything specific down this line yet because
>>> > I was wondering how environment resources (CPU, min memory,
>>> hardware
>>> > like GPU, AWS/GCP/Azure/... machine types) should/could tie into
>>> this.
>>> > >> >>
>>> > >> >>>
>>> > >> >>>> Having everyone have the same "artifact" representation
>>> > would be beneficial since:
>>> > >> >>>> a) Python environments could install dependencies from a
>>> > requirements.txt file (something that the Google Cloud Dataflow
>>> > Python docker container allows for today)
>>> > >> >>>> b) It provides an extensible and versioned mechanism for
>>> > SDKs, environments, and artifact staging/retrieval services to
>>> > support additional artifact types
>>> > >> >>>> c) Allow for expressing a canonical representation of an
>>> > artifact like a Maven package so a runner could merge environments
>>> > that the runner deems compatible.
>>> > >> >>>>
>>> > >> >>>> The flow I could see is:
>>> > >> >>>> 1) (optional) query artifact staging service for supported
>>> > artifact types
>>> > >> >>>> 2) SDK request expansion service to expand transform
>>> > passing in a list of artifact types the SDK and artifact staging
>>> > service support, the expansion service returns a list of artifact
>>> > types limited to those supported types + any supported by the
>>> > environment
>>> > >> >>>
>>> > >> >>> The crux of the issue seems to be how the expansion service
>>> > returns
>>> > >> >>> the artifacts themselves. Is this going with the approach
>>> > that the
>>> > >> >>> caller of the expansion service must host an artifact
>>> > staging service?
>>> > >> >>
>>> > >> >>
>>> > >> >> The caller would not need to host an artifact staging
>>> service
>>> > (but would become effectively a proxy service, see my comment below
>>> > for more details) as I would have expected this to be part of the
>>> > expansion service response.
>>> > >> >>
>>> > >> >>>
>>> > >> >>> There is also the question here is how the returned
>>> > artifacts get
>>> > >> >>> attached to the various environments, or whether they get
>>> > implicitly
>>> > >> >>> applied to all returned stages (which need not have a
>>> consistent
>>> > >> >>> environment)?
>>> > >> >>
>>> > >> >>
>>> > >> >> I would suggest returning additional information that says
>>> > what artifact is for which environment. Applying all artifacts to
>>> > all environments is likely to cause issues since some environments
>>> > may not understand certain artifact types or may get conflicting
>>> > versions of artifacts. I would see this happening since an
>>> expansion
>>> > service that aggregates other expansion services seems likely, for
>>> > example:
>>> > >> >> /-> ExpansionSerivce(Python)
>>> > >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>>> > >> >> \-> ExpansionSerivce(Go)
>>> > >> >
>>> > >> > All of this goes back to the idea that I think the listing of
>>> > >> > artifacts (or more general dependencies) should be a property
>>> > of the
>>> > >> > environment themselves.
>>> > >> >
>>> > >> >>>> 3) SDK converts any artifact types that the artifact
>>> > staging service or environment doesn't understand, e.g. pulls down
>>> > Maven dependencies and converts them to "bytes" artifacts
>>> > >> >>>
>>> > >> >>> Here I think we're conflating two things. The "type" of an
>>> > artifact is
>>> > >> >>> both (1) how to fetch the bytes and (2) how to interpret
>>> > them (e.g. is
>>> > >> >>> this a jar file, or a pip tarball, or just some data needed
>>> > by a DoFn,
>>> > >> >>> or ...) Only (1) can be freely transmuted.
>>> > >> >>
>>> > >> >>
>>> > >> >> Your right. Thinking about this some more, general artifact
>>> > conversion is unlikely to be practical because how to interpret an
>>> > artifact is environment dependent. For example, a requirements.txt
>>> > used to install pip packages for a Python docker container depends
>>> > on the filesystem layout of that specific docker container. One
>>> > could simulate doing a pip install on the same filesystem, see the
>>> > diff and then of all the packages in requirements.txt but this
>>> > quickly becomes impractical.
>>> > >> >>
>>> > >> >>>
>>> > >> >>>> 4) SDK sends artifacts to artifact staging service
>>> > >> >>>> 5) Artifact staging service converts any artifacts to
>>> types
>>> > that the environment understands
>>> > >> >>>> 6) Environment is started and gets artifacts from the
>>> > artifact retrieval service.
>>> > >> >>>>
>>> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
>>> > <robertwb@google.com <ma...@google.com>> wrote:
>>> > >> >>>>>
>>> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>> > >> >>>>>>
>>> > >> >>>>>> Good idea to let the client expose an artifact staging
>>> > service that the
>>> > >> >>>>>> ExpansionService could use to stage artifacts. This
>>> > solves two problems:
>>> > >> >>>>>>
>>> > >> >>>>>> (1) The Expansion Service not being able to access the
>>> > Job Server
>>> > >> >>>>>> artifact staging service
>>> > >> >>>>>> (2) The client not having access to the dependencies
>>> > returned by the
>>> > >> >>>>>> Expansion Server
>>> > >> >>>>>>
>>> > >> >>>>>> The downside is that it adds an additional indirection.
>>> > The alternative
>>> > >> >>>>>> to let the client handle staging the artifacts returned
>>> > by the Expansion
>>> > >> >>>>>> Server is more transparent and easier to implement.
>>> > >> >>>>>
>>> > >> >>>>> The other downside is that it may not always be possible
>>> > for the
>>> > >> >>>>> expansion service to connect to the artifact staging
>>> > service (e.g.
>>> > >> >>>>> when constructing a pipeline locally against a remote
>>> > expansion
>>> > >> >>>>> service).
>>> > >> >>>>
>>> > >> >>>> Just to make sure, your saying the expansion service would
>>> > return all the artifacts (bytes, urls, ...) as part of the response
>>> > since the expansion service wouldn't be able to connect to the SDK
>>> > that is running locally either.
>>> > >> >>>
>>> > >> >>> Yes. Well, more I'm asking how the expansion service would
>>> > return any
>>> > >> >>> artifacts.
>>> > >> >>>
>>> > >> >>> What we have is
>>> > >> >>>
>>> > >> >>> Runner <--- SDK ---> Expansion service.
>>> > >> >>>
>>> > >> >>> Where the unidirectional arrow means "instantiates a
>>> > connection with"
>>> > >> >>> and the other direction (and missing arrows) may not be
>>> > possible.
>>> > >> >>
>>> > >> >>
>>> > >> >> I believe the ExpansionService Expand request should become
>>> a
>>> > unidirectional stream back to the caller so that artifacts could be
>>> > sent back to the SDK (effectively mirroring the artifact staging
>>> > service API). So the expansion response would stream back a bunch
>>> > artifact data messages and also the expansion response containing
>>> > PTransform information.
>>> > >> >
>>> > >> > +1.
>>> > >> >
>>> > >> >>>>>> Ideally, the Expansion Service won't return any
>>> > dependencies because the
>>> > >> >>>>>> environment already contains the required dependencies.
>>> > We could make it
>>> > >> >>>>>> a requirement for the expansion to be performed inside
>>> an
>>> > environment.
>>> > >> >>>>>> Then we would already ensure during expansion time that
>>> > the runtime
>>> > >> >>>>>> dependencies are available.
>>> > >> >>>>>
>>> > >> >>>>> Yes, it's cleanest if the expansion service provides an
>>> > environment
>>> > >> >>>>> without all the dependencies provided. Interesting idea
>>> to
>>> > make this a
>>> > >> >>>>> property of the expansion service itself.
>>> > >> >>>>
>>> > >> >>>> I had thought this too but an opaque docker container that
>>> > was built on top of a base Beam docker container would be very
>>> > difficult for a runner to introspect and check to see if its
>>> > compatible to allow for fusion across PTransforms. I think
>>> artifacts
>>> > need to be communicated in their canonical representation.
>>> > >> >>>
>>> > >> >>> It's clean (from the specification point of view), but
>>> > doesn't allow
>>> > >> >>> for good introspection/fusion (aside from one being a base
>>> > of another,
>>> > >> >>> perhaps).
>>> > >> >>>
>>> > >> >>>>>>> In this case, the runner would (as
>>> > >> >>>>>>> requested by its configuration) be free to merge
>>> > environments it
>>> > >> >>>>>>> deemed compatible, including swapping out beam-java-X
>>> for
>>> > >> >>>>>>> beam-java-embedded if it considers itself compatible
>>> > with the
>>> > >> >>>>>>> dependency list.
>>> > >> >>>>>>
>>> > >> >>>>>> Could you explain how that would work in practice?
>>> > >> >>>>>
>>> > >> >>>>> Say one has a pipeline with environments
>>> > >> >>>>>
>>> > >> >>>>> A: beam-java-sdk-2.12-docker
>>> > >> >>>>> B: beam-java-sdk-2.12-docker + dep1
>>> > >> >>>>> C: beam-java-sdk-2.12-docker + dep2
>>> > >> >>>>> D: beam-java-sdk-2.12-docker + dep3
>>> > >> >>>>>
>>> > >> >>>>> A runner could (conceivably) be intelligent enough to
>>> know
>>> > that dep1
>>> > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
>>> > single
>>> > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment
>>> (with the
>>> > >> >>>>> corresponding fusion and lower overhead benefits). If a
>>> > certain
>>> > >> >>>>> pipeline option is set, it might further note that dep1
>>> > and dep2 are
>>> > >> >>>>> compatible with its own workers, which are build against
>>> > sdk-2.12, and
>>> > >> >>>>> choose to run these in embedded + dep1 + dep2
>>> environment.
>>> > >> >>>>
>>> > >> >>>> We have been talking about the expansion service and cross
>>> > language transforms a lot lately but I believe it will initially
>>> > come at the cost of poor fusion of transforms since "merging"
>>> > environments that are compatible is a difficult problem since it
>>> > brings up many of the dependency management issues (e.g. diamond
>>> > dependency issues).
>>> > >> >>>
>>> > >> >>> I agree. I think expansion services offering "kitchen-sink"
>>> > >> >>> containers, when possible, can go far here. If we could at
>>> least
>>> > >> >>> recognize when one environment/set of deps is a superset of
>>> > another,
>>> > >> >>> that could be an easy case that would yield a lot of
>>> benefit
>>> > as well.
>>> > >> >>
>>> > >> >>
>>> > >> >> +1
>>> >
>>>
>>
Re: Artifact staging in cross-language pipelines
Posted by Heejong Lee <he...@google.com>.
Hi,
I wrote the draft on implementation plan[1]. The summary is on the first
page. Any help would be appreciated!
[1]:
https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing
On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <he...@google.com> wrote:
> I'm brushing up memory by revisiting the doc[1] and it seems like we've
> already reached the consensus on the bigger picture. I would start drafting
> the implementation plan.
>
> [1]:
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>
> On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Hey Heejong,
>>
>> I don't think so. It would be great to push this forward.
>>
>> Thanks,
>> Max
>>
>> On 26.11.19 02:49, Heejong Lee wrote:
>> > Hi,
>> >
>> > Is anyone actively working on artifact staging extension for
>> > cross-language pipelines? I'm thinking I can contribute to it in coming
>> > Dec. If anyone has any progress on this and needs help, please let me
>> know.
>> >
>> > Thanks,
>> >
>> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> > Can you please add this to the design documents webpage.
>> > https://beam.apache.org/contribute/design-documents/
>> >
>> > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
>> > <chamikara@google.com <ma...@google.com>> wrote:
>> > >
>> > >
>> > >
>> > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
>> > <mxm@apache.org <ma...@apache.org>> wrote:
>> > >>
>> > >> Here's the first draft:
>> > >>
>> >
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>> > >>
>> > >> It's rather high-level. We may want to add more details once we
>> have
>> > >> finalized the design. Feel free to make comments and edits.
>> > >
>> > >
>> > > Thanks Max. Added some comments.
>> > >
>> > >>
>> > >>
>> > >> > All of this goes back to the idea that I think the listing of
>> > >> > artifacts (or more general dependencies) should be a property
>> > of the
>> > >> > environment themselves.
>> > >>
>> > >> +1 I came to the same conclusion while thinking about how to
>> store
>> > >> artifact information for deferred execution of the pipeline.
>> > >>
>> > >> -Max
>> > >>
>> > >> On 07.05.19 18:10, Robert Bradshaw wrote:
>> > >> > Looking forward to your writeup, Max. In the meantime, some
>> > comments below.
>> > >> >
>> > >> >
>> > >> > From: Lukasz Cwik <lcwik@google.com <mailto:lcwik@google.com
>> >>
>> > >> > Date: Thu, May 2, 2019 at 6:45 PM
>> > >> > To: dev
>> > >> >
>> > >> >>
>> > >> >>
>> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
>> > <robertwb@google.com <ma...@google.com>> wrote:
>> > >> >>>
>> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
>> > <lcwik@google.com <ma...@google.com>> wrote:
>> > >> >>>>
>> > >> >>>> We should stick with URN + payload + artifact metadata[1]
>> > where the only mandatory one that all SDKs and expansion services
>> > understand is the "bytes" artifact type. This allows us to add
>> > optional URNs for file://, http://, Maven, PyPi, ... in the future.
>> > I would make the artifact staging service use the same URN + payload
>> > mechanism to get compatibility of artifacts across the different
>> > services and also have the artifact staging service be able to be
>> > queried for the list of artifact types it supports.
>> > >> >>>
>> > >> >>> +1
>> > >> >>>
>> > >> >>>> Finally, we would need to have environments enumerate the
>> > artifact types that they support.
>> > >> >>>
>> > >> >>> Meaning at runtime, or as another field statically set in
>> > the proto?
>> > >> >>
>> > >> >>
>> > >> >> I don't believe runners/SDKs should have to know what
>> > artifacts each environment supports at runtime and instead have
>> > environments enumerate them explicitly in the proto. I have been
>> > thinking about a more general "capabilities" block on environments
>> > which allow them to enumerate URNs that the environment understands.
>> > This would include artifact type URNs, PTransform URNs, coder URNs,
>> > ... I haven't proposed anything specific down this line yet because
>> > I was wondering how environment resources (CPU, min memory, hardware
>> > like GPU, AWS/GCP/Azure/... machine types) should/could tie into
>> this.
>> > >> >>
>> > >> >>>
>> > >> >>>> Having everyone have the same "artifact" representation
>> > would be beneficial since:
>> > >> >>>> a) Python environments could install dependencies from a
>> > requirements.txt file (something that the Google Cloud Dataflow
>> > Python docker container allows for today)
>> > >> >>>> b) It provides an extensible and versioned mechanism for
>> > SDKs, environments, and artifact staging/retrieval services to
>> > support additional artifact types
>> > >> >>>> c) Allow for expressing a canonical representation of an
>> > artifact like a Maven package so a runner could merge environments
>> > that the runner deems compatible.
>> > >> >>>>
>> > >> >>>> The flow I could see is:
>> > >> >>>> 1) (optional) query artifact staging service for supported
>> > artifact types
>> > >> >>>> 2) SDK request expansion service to expand transform
>> > passing in a list of artifact types the SDK and artifact staging
>> > service support, the expansion service returns a list of artifact
>> > types limited to those supported types + any supported by the
>> > environment
>> > >> >>>
>> > >> >>> The crux of the issue seems to be how the expansion service
>> > returns
>> > >> >>> the artifacts themselves. Is this going with the approach
>> > that the
>> > >> >>> caller of the expansion service must host an artifact
>> > staging service?
>> > >> >>
>> > >> >>
>> > >> >> The caller would not need to host an artifact staging service
>> > (but would become effectively a proxy service, see my comment below
>> > for more details) as I would have expected this to be part of the
>> > expansion service response.
>> > >> >>
>> > >> >>>
>> > >> >>> There is also the question here is how the returned
>> > artifacts get
>> > >> >>> attached to the various environments, or whether they get
>> > implicitly
>> > >> >>> applied to all returned stages (which need not have a
>> consistent
>> > >> >>> environment)?
>> > >> >>
>> > >> >>
>> > >> >> I would suggest returning additional information that says
>> > what artifact is for which environment. Applying all artifacts to
>> > all environments is likely to cause issues since some environments
>> > may not understand certain artifact types or may get conflicting
>> > versions of artifacts. I would see this happening since an expansion
>> > service that aggregates other expansion services seems likely, for
>> > example:
>> > >> >> /-> ExpansionSerivce(Python)
>> > >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>> > >> >> \-> ExpansionSerivce(Go)
>> > >> >
>> > >> > All of this goes back to the idea that I think the listing of
>> > >> > artifacts (or more general dependencies) should be a property
>> > of the
>> > >> > environment themselves.
>> > >> >
>> > >> >>>> 3) SDK converts any artifact types that the artifact
>> > staging service or environment doesn't understand, e.g. pulls down
>> > Maven dependencies and converts them to "bytes" artifacts
>> > >> >>>
>> > >> >>> Here I think we're conflating two things. The "type" of an
>> > artifact is
>> > >> >>> both (1) how to fetch the bytes and (2) how to interpret
>> > them (e.g. is
>> > >> >>> this a jar file, or a pip tarball, or just some data needed
>> > by a DoFn,
>> > >> >>> or ...) Only (1) can be freely transmuted.
>> > >> >>
>> > >> >>
>> > >> >> Your right. Thinking about this some more, general artifact
>> > conversion is unlikely to be practical because how to interpret an
>> > artifact is environment dependent. For example, a requirements.txt
>> > used to install pip packages for a Python docker container depends
>> > on the filesystem layout of that specific docker container. One
>> > could simulate doing a pip install on the same filesystem, see the
>> > diff and then of all the packages in requirements.txt but this
>> > quickly becomes impractical.
>> > >> >>
>> > >> >>>
>> > >> >>>> 4) SDK sends artifacts to artifact staging service
>> > >> >>>> 5) Artifact staging service converts any artifacts to types
>> > that the environment understands
>> > >> >>>> 6) Environment is started and gets artifacts from the
>> > artifact retrieval service.
>> > >> >>>>
>> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
>> > <robertwb@google.com <ma...@google.com>> wrote:
>> > >> >>>>>
>> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
>> > <mxm@apache.org <ma...@apache.org>> wrote:
>> > >> >>>>>>
>> > >> >>>>>> Good idea to let the client expose an artifact staging
>> > service that the
>> > >> >>>>>> ExpansionService could use to stage artifacts. This
>> > solves two problems:
>> > >> >>>>>>
>> > >> >>>>>> (1) The Expansion Service not being able to access the
>> > Job Server
>> > >> >>>>>> artifact staging service
>> > >> >>>>>> (2) The client not having access to the dependencies
>> > returned by the
>> > >> >>>>>> Expansion Server
>> > >> >>>>>>
>> > >> >>>>>> The downside is that it adds an additional indirection.
>> > The alternative
>> > >> >>>>>> to let the client handle staging the artifacts returned
>> > by the Expansion
>> > >> >>>>>> Server is more transparent and easier to implement.
>> > >> >>>>>
>> > >> >>>>> The other downside is that it may not always be possible
>> > for the
>> > >> >>>>> expansion service to connect to the artifact staging
>> > service (e.g.
>> > >> >>>>> when constructing a pipeline locally against a remote
>> > expansion
>> > >> >>>>> service).
>> > >> >>>>
>> > >> >>>> Just to make sure, your saying the expansion service would
>> > return all the artifacts (bytes, urls, ...) as part of the response
>> > since the expansion service wouldn't be able to connect to the SDK
>> > that is running locally either.
>> > >> >>>
>> > >> >>> Yes. Well, more I'm asking how the expansion service would
>> > return any
>> > >> >>> artifacts.
>> > >> >>>
>> > >> >>> What we have is
>> > >> >>>
>> > >> >>> Runner <--- SDK ---> Expansion service.
>> > >> >>>
>> > >> >>> Where the unidirectional arrow means "instantiates a
>> > connection with"
>> > >> >>> and the other direction (and missing arrows) may not be
>> > possible.
>> > >> >>
>> > >> >>
>> > >> >> I believe the ExpansionService Expand request should become a
>> > unidirectional stream back to the caller so that artifacts could be
>> > sent back to the SDK (effectively mirroring the artifact staging
>> > service API). So the expansion response would stream back a bunch
>> > artifact data messages and also the expansion response containing
>> > PTransform information.
>> > >> >
>> > >> > +1.
>> > >> >
>> > >> >>>>>> Ideally, the Expansion Service won't return any
>> > dependencies because the
>> > >> >>>>>> environment already contains the required dependencies.
>> > We could make it
>> > >> >>>>>> a requirement for the expansion to be performed inside an
>> > environment.
>> > >> >>>>>> Then we would already ensure during expansion time that
>> > the runtime
>> > >> >>>>>> dependencies are available.
>> > >> >>>>>
>> > >> >>>>> Yes, it's cleanest if the expansion service provides an
>> > environment
>> > >> >>>>> without all the dependencies provided. Interesting idea to
>> > make this a
>> > >> >>>>> property of the expansion service itself.
>> > >> >>>>
>> > >> >>>> I had thought this too but an opaque docker container that
>> > was built on top of a base Beam docker container would be very
>> > difficult for a runner to introspect and check to see if its
>> > compatible to allow for fusion across PTransforms. I think artifacts
>> > need to be communicated in their canonical representation.
>> > >> >>>
>> > >> >>> It's clean (from the specification point of view), but
>> > doesn't allow
>> > >> >>> for good introspection/fusion (aside from one being a base
>> > of another,
>> > >> >>> perhaps).
>> > >> >>>
>> > >> >>>>>>> In this case, the runner would (as
>> > >> >>>>>>> requested by its configuration) be free to merge
>> > environments it
>> > >> >>>>>>> deemed compatible, including swapping out beam-java-X
>> for
>> > >> >>>>>>> beam-java-embedded if it considers itself compatible
>> > with the
>> > >> >>>>>>> dependency list.
>> > >> >>>>>>
>> > >> >>>>>> Could you explain how that would work in practice?
>> > >> >>>>>
>> > >> >>>>> Say one has a pipeline with environments
>> > >> >>>>>
>> > >> >>>>> A: beam-java-sdk-2.12-docker
>> > >> >>>>> B: beam-java-sdk-2.12-docker + dep1
>> > >> >>>>> C: beam-java-sdk-2.12-docker + dep2
>> > >> >>>>> D: beam-java-sdk-2.12-docker + dep3
>> > >> >>>>>
>> > >> >>>>> A runner could (conceivably) be intelligent enough to know
>> > that dep1
>> > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
>> > single
>> > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with
>> the
>> > >> >>>>> corresponding fusion and lower overhead benefits). If a
>> > certain
>> > >> >>>>> pipeline option is set, it might further note that dep1
>> > and dep2 are
>> > >> >>>>> compatible with its own workers, which are build against
>> > sdk-2.12, and
>> > >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
>> > >> >>>>
>> > >> >>>> We have been talking about the expansion service and cross
>> > language transforms a lot lately but I believe it will initially
>> > come at the cost of poor fusion of transforms since "merging"
>> > environments that are compatible is a difficult problem since it
>> > brings up many of the dependency management issues (e.g. diamond
>> > dependency issues).
>> > >> >>>
>> > >> >>> I agree. I think expansion services offering "kitchen-sink"
>> > >> >>> containers, when possible, can go far here. If we could at
>> least
>> > >> >>> recognize when one environment/set of deps is a superset of
>> > another,
>> > >> >>> that could be an easy case that would yield a lot of benefit
>> > as well.
>> > >> >>
>> > >> >>
>> > >> >> +1
>> >
>>
>
Re: Artifact staging in cross-language pipelines
Posted by Heejong Lee <he...@google.com>.
I'm brushing up memory by revisiting the doc[1] and it seems like we've
already reached the consensus on the bigger picture. I would start drafting
the implementation plan.
[1]:
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <mx...@apache.org> wrote:
> Hey Heejong,
>
> I don't think so. It would be great to push this forward.
>
> Thanks,
> Max
>
> On 26.11.19 02:49, Heejong Lee wrote:
> > Hi,
> >
> > Is anyone actively working on artifact staging extension for
> > cross-language pipelines? I'm thinking I can contribute to it in coming
> > Dec. If anyone has any progress on this and needs help, please let me
> know.
> >
> > Thanks,
> >
> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > Can you please add this to the design documents webpage.
> > https://beam.apache.org/contribute/design-documents/
> >
> > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
> > <chamikara@google.com <ma...@google.com>> wrote:
> > >
> > >
> > >
> > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org>> wrote:
> > >>
> > >> Here's the first draft:
> > >>
> >
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> > >>
> > >> It's rather high-level. We may want to add more details once we
> have
> > >> finalized the design. Feel free to make comments and edits.
> > >
> > >
> > > Thanks Max. Added some comments.
> > >
> > >>
> > >>
> > >> > All of this goes back to the idea that I think the listing of
> > >> > artifacts (or more general dependencies) should be a property
> > of the
> > >> > environment themselves.
> > >>
> > >> +1 I came to the same conclusion while thinking about how to
> store
> > >> artifact information for deferred execution of the pipeline.
> > >>
> > >> -Max
> > >>
> > >> On 07.05.19 18:10, Robert Bradshaw wrote:
> > >> > Looking forward to your writeup, Max. In the meantime, some
> > comments below.
> > >> >
> > >> >
> > >> > From: Lukasz Cwik <lcwik@google.com <ma...@google.com>>
> > >> > Date: Thu, May 2, 2019 at 6:45 PM
> > >> > To: dev
> > >> >
> > >> >>
> > >> >>
> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
> > <robertwb@google.com <ma...@google.com>> wrote:
> > >> >>>
> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
> > <lcwik@google.com <ma...@google.com>> wrote:
> > >> >>>>
> > >> >>>> We should stick with URN + payload + artifact metadata[1]
> > where the only mandatory one that all SDKs and expansion services
> > understand is the "bytes" artifact type. This allows us to add
> > optional URNs for file://, http://, Maven, PyPi, ... in the future.
> > I would make the artifact staging service use the same URN + payload
> > mechanism to get compatibility of artifacts across the different
> > services and also have the artifact staging service be able to be
> > queried for the list of artifact types it supports.
> > >> >>>
> > >> >>> +1
> > >> >>>
> > >> >>>> Finally, we would need to have environments enumerate the
> > artifact types that they support.
> > >> >>>
> > >> >>> Meaning at runtime, or as another field statically set in
> > the proto?
> > >> >>
> > >> >>
> > >> >> I don't believe runners/SDKs should have to know what
> > artifacts each environment supports at runtime and instead have
> > environments enumerate them explicitly in the proto. I have been
> > thinking about a more general "capabilities" block on environments
> > which allow them to enumerate URNs that the environment understands.
> > This would include artifact type URNs, PTransform URNs, coder URNs,
> > ... I haven't proposed anything specific down this line yet because
> > I was wondering how environment resources (CPU, min memory, hardware
> > like GPU, AWS/GCP/Azure/... machine types) should/could tie into
> this.
> > >> >>
> > >> >>>
> > >> >>>> Having everyone have the same "artifact" representation
> > would be beneficial since:
> > >> >>>> a) Python environments could install dependencies from a
> > requirements.txt file (something that the Google Cloud Dataflow
> > Python docker container allows for today)
> > >> >>>> b) It provides an extensible and versioned mechanism for
> > SDKs, environments, and artifact staging/retrieval services to
> > support additional artifact types
> > >> >>>> c) Allow for expressing a canonical representation of an
> > artifact like a Maven package so a runner could merge environments
> > that the runner deems compatible.
> > >> >>>>
> > >> >>>> The flow I could see is:
> > >> >>>> 1) (optional) query artifact staging service for supported
> > artifact types
> > >> >>>> 2) SDK request expansion service to expand transform
> > passing in a list of artifact types the SDK and artifact staging
> > service support, the expansion service returns a list of artifact
> > types limited to those supported types + any supported by the
> > environment
> > >> >>>
> > >> >>> The crux of the issue seems to be how the expansion service
> > returns
> > >> >>> the artifacts themselves. Is this going with the approach
> > that the
> > >> >>> caller of the expansion service must host an artifact
> > staging service?
> > >> >>
> > >> >>
> > >> >> The caller would not need to host an artifact staging service
> > (but would become effectively a proxy service, see my comment below
> > for more details) as I would have expected this to be part of the
> > expansion service response.
> > >> >>
> > >> >>>
> > >> >>> There is also the question here is how the returned
> > artifacts get
> > >> >>> attached to the various environments, or whether they get
> > implicitly
> > >> >>> applied to all returned stages (which need not have a
> consistent
> > >> >>> environment)?
> > >> >>
> > >> >>
> > >> >> I would suggest returning additional information that says
> > what artifact is for which environment. Applying all artifacts to
> > all environments is likely to cause issues since some environments
> > may not understand certain artifact types or may get conflicting
> > versions of artifacts. I would see this happening since an expansion
> > service that aggregates other expansion services seems likely, for
> > example:
> > >> >> /-> ExpansionSerivce(Python)
> > >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> > >> >> \-> ExpansionSerivce(Go)
> > >> >
> > >> > All of this goes back to the idea that I think the listing of
> > >> > artifacts (or more general dependencies) should be a property
> > of the
> > >> > environment themselves.
> > >> >
> > >> >>>> 3) SDK converts any artifact types that the artifact
> > staging service or environment doesn't understand, e.g. pulls down
> > Maven dependencies and converts them to "bytes" artifacts
> > >> >>>
> > >> >>> Here I think we're conflating two things. The "type" of an
> > artifact is
> > >> >>> both (1) how to fetch the bytes and (2) how to interpret
> > them (e.g. is
> > >> >>> this a jar file, or a pip tarball, or just some data needed
> > by a DoFn,
> > >> >>> or ...) Only (1) can be freely transmuted.
> > >> >>
> > >> >>
> > >> >> Your right. Thinking about this some more, general artifact
> > conversion is unlikely to be practical because how to interpret an
> > artifact is environment dependent. For example, a requirements.txt
> > used to install pip packages for a Python docker container depends
> > on the filesystem layout of that specific docker container. One
> > could simulate doing a pip install on the same filesystem, see the
> > diff and then of all the packages in requirements.txt but this
> > quickly becomes impractical.
> > >> >>
> > >> >>>
> > >> >>>> 4) SDK sends artifacts to artifact staging service
> > >> >>>> 5) Artifact staging service converts any artifacts to types
> > that the environment understands
> > >> >>>> 6) Environment is started and gets artifacts from the
> > artifact retrieval service.
> > >> >>>>
> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
> > <robertwb@google.com <ma...@google.com>> wrote:
> > >> >>>>>
> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org>> wrote:
> > >> >>>>>>
> > >> >>>>>> Good idea to let the client expose an artifact staging
> > service that the
> > >> >>>>>> ExpansionService could use to stage artifacts. This
> > solves two problems:
> > >> >>>>>>
> > >> >>>>>> (1) The Expansion Service not being able to access the
> > Job Server
> > >> >>>>>> artifact staging service
> > >> >>>>>> (2) The client not having access to the dependencies
> > returned by the
> > >> >>>>>> Expansion Server
> > >> >>>>>>
> > >> >>>>>> The downside is that it adds an additional indirection.
> > The alternative
> > >> >>>>>> to let the client handle staging the artifacts returned
> > by the Expansion
> > >> >>>>>> Server is more transparent and easier to implement.
> > >> >>>>>
> > >> >>>>> The other downside is that it may not always be possible
> > for the
> > >> >>>>> expansion service to connect to the artifact staging
> > service (e.g.
> > >> >>>>> when constructing a pipeline locally against a remote
> > expansion
> > >> >>>>> service).
> > >> >>>>
> > >> >>>> Just to make sure, your saying the expansion service would
> > return all the artifacts (bytes, urls, ...) as part of the response
> > since the expansion service wouldn't be able to connect to the SDK
> > that is running locally either.
> > >> >>>
> > >> >>> Yes. Well, more I'm asking how the expansion service would
> > return any
> > >> >>> artifacts.
> > >> >>>
> > >> >>> What we have is
> > >> >>>
> > >> >>> Runner <--- SDK ---> Expansion service.
> > >> >>>
> > >> >>> Where the unidirectional arrow means "instantiates a
> > connection with"
> > >> >>> and the other direction (and missing arrows) may not be
> > possible.
> > >> >>
> > >> >>
> > >> >> I believe the ExpansionService Expand request should become a
> > unidirectional stream back to the caller so that artifacts could be
> > sent back to the SDK (effectively mirroring the artifact staging
> > service API). So the expansion response would stream back a bunch
> > artifact data messages and also the expansion response containing
> > PTransform information.
> > >> >
> > >> > +1.
> > >> >
> > >> >>>>>> Ideally, the Expansion Service won't return any
> > dependencies because the
> > >> >>>>>> environment already contains the required dependencies.
> > We could make it
> > >> >>>>>> a requirement for the expansion to be performed inside an
> > environment.
> > >> >>>>>> Then we would already ensure during expansion time that
> > the runtime
> > >> >>>>>> dependencies are available.
> > >> >>>>>
> > >> >>>>> Yes, it's cleanest if the expansion service provides an
> > environment
> > >> >>>>> without all the dependencies provided. Interesting idea to
> > make this a
> > >> >>>>> property of the expansion service itself.
> > >> >>>>
> > >> >>>> I had thought this too but an opaque docker container that
> > was built on top of a base Beam docker container would be very
> > difficult for a runner to introspect and check to see if its
> > compatible to allow for fusion across PTransforms. I think artifacts
> > need to be communicated in their canonical representation.
> > >> >>>
> > >> >>> It's clean (from the specification point of view), but
> > doesn't allow
> > >> >>> for good introspection/fusion (aside from one being a base
> > of another,
> > >> >>> perhaps).
> > >> >>>
> > >> >>>>>>> In this case, the runner would (as
> > >> >>>>>>> requested by its configuration) be free to merge
> > environments it
> > >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> > >> >>>>>>> beam-java-embedded if it considers itself compatible
> > with the
> > >> >>>>>>> dependency list.
> > >> >>>>>>
> > >> >>>>>> Could you explain how that would work in practice?
> > >> >>>>>
> > >> >>>>> Say one has a pipeline with environments
> > >> >>>>>
> > >> >>>>> A: beam-java-sdk-2.12-docker
> > >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> > >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> > >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> > >> >>>>>
> > >> >>>>> A runner could (conceivably) be intelligent enough to know
> > that dep1
> > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
> > single
> > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with
> the
> > >> >>>>> corresponding fusion and lower overhead benefits). If a
> > certain
> > >> >>>>> pipeline option is set, it might further note that dep1
> > and dep2 are
> > >> >>>>> compatible with its own workers, which are build against
> > sdk-2.12, and
> > >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> > >> >>>>
> > >> >>>> We have been talking about the expansion service and cross
> > language transforms a lot lately but I believe it will initially
> > come at the cost of poor fusion of transforms since "merging"
> > environments that are compatible is a difficult problem since it
> > brings up many of the dependency management issues (e.g. diamond
> > dependency issues).
> > >> >>>
> > >> >>> I agree. I think expansion services offering "kitchen-sink"
> > >> >>> containers, when possible, can go far here. If we could at
> least
> > >> >>> recognize when one environment/set of deps is a superset of
> > another,
> > >> >>> that could be an easy case that would yield a lot of benefit
> > as well.
> > >> >>
> > >> >>
> > >> >> +1
> >
>
Re: Artifact staging in cross-language pipelines
Posted by Maximilian Michels <mx...@apache.org>.
Hey Heejong,
I don't think so. It would be great to push this forward.
Thanks,
Max
On 26.11.19 02:49, Heejong Lee wrote:
> Hi,
>
> Is anyone actively working on artifact staging extension for
> cross-language pipelines? I'm thinking I can contribute to it in coming
> Dec. If anyone has any progress on this and needs help, please let me know.
>
> Thanks,
>
> On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <iemejia@gmail.com
> <ma...@gmail.com>> wrote:
>
> Can you please add this to the design documents webpage.
> https://beam.apache.org/contribute/design-documents/
>
> On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath
> <chamikara@google.com <ma...@google.com>> wrote:
> >
> >
> >
> > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels
> <mxm@apache.org <ma...@apache.org>> wrote:
> >>
> >> Here's the first draft:
> >>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> >>
> >> It's rather high-level. We may want to add more details once we have
> >> finalized the design. Feel free to make comments and edits.
> >
> >
> > Thanks Max. Added some comments.
> >
> >>
> >>
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property
> of the
> >> > environment themselves.
> >>
> >> +1 I came to the same conclusion while thinking about how to store
> >> artifact information for deferred execution of the pipeline.
> >>
> >> -Max
> >>
> >> On 07.05.19 18:10, Robert Bradshaw wrote:
> >> > Looking forward to your writeup, Max. In the meantime, some
> comments below.
> >> >
> >> >
> >> > From: Lukasz Cwik <lcwik@google.com <ma...@google.com>>
> >> > Date: Thu, May 2, 2019 at 6:45 PM
> >> > To: dev
> >> >
> >> >>
> >> >>
> >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw
> <robertwb@google.com <ma...@google.com>> wrote:
> >> >>>
> >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik
> <lcwik@google.com <ma...@google.com>> wrote:
> >> >>>>
> >> >>>> We should stick with URN + payload + artifact metadata[1]
> where the only mandatory one that all SDKs and expansion services
> understand is the "bytes" artifact type. This allows us to add
> optional URNs for file://, http://, Maven, PyPi, ... in the future.
> I would make the artifact staging service use the same URN + payload
> mechanism to get compatibility of artifacts across the different
> services and also have the artifact staging service be able to be
> queried for the list of artifact types it supports.
> >> >>>
> >> >>> +1
> >> >>>
> >> >>>> Finally, we would need to have environments enumerate the
> artifact types that they support.
> >> >>>
> >> >>> Meaning at runtime, or as another field statically set in
> the proto?
> >> >>
> >> >>
> >> >> I don't believe runners/SDKs should have to know what
> artifacts each environment supports at runtime and instead have
> environments enumerate them explicitly in the proto. I have been
> thinking about a more general "capabilities" block on environments
> which allow them to enumerate URNs that the environment understands.
> This would include artifact type URNs, PTransform URNs, coder URNs,
> ... I haven't proposed anything specific down this line yet because
> I was wondering how environment resources (CPU, min memory, hardware
> like GPU, AWS/GCP/Azure/... machine types) should/could tie into this.
> >> >>
> >> >>>
> >> >>>> Having everyone have the same "artifact" representation
> would be beneficial since:
> >> >>>> a) Python environments could install dependencies from a
> requirements.txt file (something that the Google Cloud Dataflow
> Python docker container allows for today)
> >> >>>> b) It provides an extensible and versioned mechanism for
> SDKs, environments, and artifact staging/retrieval services to
> support additional artifact types
> >> >>>> c) Allow for expressing a canonical representation of an
> artifact like a Maven package so a runner could merge environments
> that the runner deems compatible.
> >> >>>>
> >> >>>> The flow I could see is:
> >> >>>> 1) (optional) query artifact staging service for supported
> artifact types
> >> >>>> 2) SDK request expansion service to expand transform
> passing in a list of artifact types the SDK and artifact staging
> service support, the expansion service returns a list of artifact
> types limited to those supported types + any supported by the
> environment
> >> >>>
> >> >>> The crux of the issue seems to be how the expansion service
> returns
> >> >>> the artifacts themselves. Is this going with the approach
> that the
> >> >>> caller of the expansion service must host an artifact
> staging service?
> >> >>
> >> >>
> >> >> The caller would not need to host an artifact staging service
> (but would become effectively a proxy service, see my comment below
> for more details) as I would have expected this to be part of the
> expansion service response.
> >> >>
> >> >>>
> >> >>> There is also the question here is how the returned
> artifacts get
> >> >>> attached to the various environments, or whether they get
> implicitly
> >> >>> applied to all returned stages (which need not have a consistent
> >> >>> environment)?
> >> >>
> >> >>
> >> >> I would suggest returning additional information that says
> what artifact is for which environment. Applying all artifacts to
> all environments is likely to cause issues since some environments
> may not understand certain artifact types or may get conflicting
> versions of artifacts. I would see this happening since an expansion
> service that aggregates other expansion services seems likely, for
> example:
> >> >> /-> ExpansionSerivce(Python)
> >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >> >> \-> ExpansionSerivce(Go)
> >> >
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property
> of the
> >> > environment themselves.
> >> >
> >> >>>> 3) SDK converts any artifact types that the artifact
> staging service or environment doesn't understand, e.g. pulls down
> Maven dependencies and converts them to "bytes" artifacts
> >> >>>
> >> >>> Here I think we're conflating two things. The "type" of an
> artifact is
> >> >>> both (1) how to fetch the bytes and (2) how to interpret
> them (e.g. is
> >> >>> this a jar file, or a pip tarball, or just some data needed
> by a DoFn,
> >> >>> or ...) Only (1) can be freely transmuted.
> >> >>
> >> >>
> >> >> Your right. Thinking about this some more, general artifact
> conversion is unlikely to be practical because how to interpret an
> artifact is environment dependent. For example, a requirements.txt
> used to install pip packages for a Python docker container depends
> on the filesystem layout of that specific docker container. One
> could simulate doing a pip install on the same filesystem, see the
> diff and then of all the packages in requirements.txt but this
> quickly becomes impractical.
> >> >>
> >> >>>
> >> >>>> 4) SDK sends artifacts to artifact staging service
> >> >>>> 5) Artifact staging service converts any artifacts to types
> that the environment understands
> >> >>>> 6) Environment is started and gets artifacts from the
> artifact retrieval service.
> >> >>>>
> >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw
> <robertwb@google.com <ma...@google.com>> wrote:
> >> >>>>>
> >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
> <mxm@apache.org <ma...@apache.org>> wrote:
> >> >>>>>>
> >> >>>>>> Good idea to let the client expose an artifact staging
> service that the
> >> >>>>>> ExpansionService could use to stage artifacts. This
> solves two problems:
> >> >>>>>>
> >> >>>>>> (1) The Expansion Service not being able to access the
> Job Server
> >> >>>>>> artifact staging service
> >> >>>>>> (2) The client not having access to the dependencies
> returned by the
> >> >>>>>> Expansion Server
> >> >>>>>>
> >> >>>>>> The downside is that it adds an additional indirection.
> The alternative
> >> >>>>>> to let the client handle staging the artifacts returned
> by the Expansion
> >> >>>>>> Server is more transparent and easier to implement.
> >> >>>>>
> >> >>>>> The other downside is that it may not always be possible
> for the
> >> >>>>> expansion service to connect to the artifact staging
> service (e.g.
> >> >>>>> when constructing a pipeline locally against a remote
> expansion
> >> >>>>> service).
> >> >>>>
> >> >>>> Just to make sure, your saying the expansion service would
> return all the artifacts (bytes, urls, ...) as part of the response
> since the expansion service wouldn't be able to connect to the SDK
> that is running locally either.
> >> >>>
> >> >>> Yes. Well, more I'm asking how the expansion service would
> return any
> >> >>> artifacts.
> >> >>>
> >> >>> What we have is
> >> >>>
> >> >>> Runner <--- SDK ---> Expansion service.
> >> >>>
> >> >>> Where the unidirectional arrow means "instantiates a
> connection with"
> >> >>> and the other direction (and missing arrows) may not be
> possible.
> >> >>
> >> >>
> >> >> I believe the ExpansionService Expand request should become a
> unidirectional stream back to the caller so that artifacts could be
> sent back to the SDK (effectively mirroring the artifact staging
> service API). So the expansion response would stream back a bunch
> artifact data messages and also the expansion response containing
> PTransform information.
> >> >
> >> > +1.
> >> >
> >> >>>>>> Ideally, the Expansion Service won't return any
> dependencies because the
> >> >>>>>> environment already contains the required dependencies.
> We could make it
> >> >>>>>> a requirement for the expansion to be performed inside an
> environment.
> >> >>>>>> Then we would already ensure during expansion time that
> the runtime
> >> >>>>>> dependencies are available.
> >> >>>>>
> >> >>>>> Yes, it's cleanest if the expansion service provides an
> environment
> >> >>>>> without all the dependencies provided. Interesting idea to
> make this a
> >> >>>>> property of the expansion service itself.
> >> >>>>
> >> >>>> I had thought this too but an opaque docker container that
> was built on top of a base Beam docker container would be very
> difficult for a runner to introspect and check to see if its
> compatible to allow for fusion across PTransforms. I think artifacts
> need to be communicated in their canonical representation.
> >> >>>
> >> >>> It's clean (from the specification point of view), but
> doesn't allow
> >> >>> for good introspection/fusion (aside from one being a base
> of another,
> >> >>> perhaps).
> >> >>>
> >> >>>>>>> In this case, the runner would (as
> >> >>>>>>> requested by its configuration) be free to merge
> environments it
> >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> >> >>>>>>> beam-java-embedded if it considers itself compatible
> with the
> >> >>>>>>> dependency list.
> >> >>>>>>
> >> >>>>>> Could you explain how that would work in practice?
> >> >>>>>
> >> >>>>> Say one has a pipeline with environments
> >> >>>>>
> >> >>>>> A: beam-java-sdk-2.12-docker
> >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> >> >>>>>
> >> >>>>> A runner could (conceivably) be intelligent enough to know
> that dep1
> >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a
> single
> >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
> >> >>>>> corresponding fusion and lower overhead benefits). If a
> certain
> >> >>>>> pipeline option is set, it might further note that dep1
> and dep2 are
> >> >>>>> compatible with its own workers, which are build against
> sdk-2.12, and
> >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> >> >>>>
> >> >>>> We have been talking about the expansion service and cross
> language transforms a lot lately but I believe it will initially
> come at the cost of poor fusion of transforms since "merging"
> environments that are compatible is a difficult problem since it
> brings up many of the dependency management issues (e.g. diamond
> dependency issues).
> >> >>>
> >> >>> I agree. I think expansion services offering "kitchen-sink"
> >> >>> containers, when possible, can go far here. If we could at least
> >> >>> recognize when one environment/set of deps is a superset of
> another,
> >> >>> that could be an easy case that would yield a lot of benefit
> as well.
> >> >>
> >> >>
> >> >> +1
>