You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2020/04/01 00:48:20 UTC

Re: Unportable Dataflow Pipeline Questions

On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:

> Hi All,
>
> I am currently investigating making the Python DataflowRunner to use a
> portable pipeline representation so that we can eventually get rid of the
> Pipeline(runner) weirdness.
>
> In that case, I have a lot questions about the Python DataflowRunner:
>
> *PValueCache*
>
>    - Why does this exist?
>
> This is historical baggage from the (long gone) first direct runner when
actual computed PCollections were cached, and the DataflowRunner inherited
it.


> *DataflowRunner*
>
>    - I see that the DataflowRunner defines some PTransforms as
>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>    methods. Then in the run_ methods, it references the PValueCache to add
>    steps.
>       - How does this add steps?
>       - Where does it cache the values to?
>       - How does the runner harness pick up these cached values to create
>       new steps?
>       - How is this information communicated to the runner harness?
>    - Why do the following transforms need to be overridden: GroupByKey,
>    WriteToBigQuery, CombineValues, Read?
>
> Each of these four has a different implementation on Dataflow.

>
>    - Why doesn't the ParDo transform need to be overridden? I see that it
>    has a run_ method but no apply_ method.
>
> apply_ is called at pipeline construction time, all of these should be
replaced by PTransformOverrides. run_ is called after pipeline construction
to actually build up the dataflow graph.


> *Possible fixes*
> I was thinking of getting rid of the apply_ and run_ methods and replacing
> those with a PTransformOverride and a simple PipelineVisitor, respectively.
> Is this feasible? Am I missing any assumptions that don't make this
> feasible?
>

If we're going to overhaul how the runner works, it would be best to make
DataflowRunner direct a translator from Beam runner api protos to Cloudv1b3
protos, rather than manipulate the intermediate Python representation
(which no one wants to change for fear of messing up DataflowRunner and
cause headaches for cross langauge).

Re: Unportable Dataflow Pipeline Questions

Posted by Luke Cwik <lc...@google.com>.
I think making the Dataflow service translate into the pipeline proto
directly will be a lot of work.

On Thu, Apr 2, 2020 at 6:03 PM Robert Burke <ro...@frantil.com> wrote:

> It's stateless translation code and nothing is sourced outside of the beam
> pipeline proto, so it should be fairly straightforward code to write and
> test.
>
> One can collect several before and afters of the existing translations and
> use them to validate.
> There are a few quirks that were previously necessary though to get
> Dataflow to work properly for the Go SDK, in particular around DoFns
> without outputs, but that's reasonably clear in the translator.
>
> On Thu, Apr 2, 2020, 5:57 PM Robert Bradshaw <ro...@google.com> wrote:
>
>> On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <ro...@frantil.com> wrote:
>>>
>>>> In particular, ideally the Dataflow Service is handling the Dataflow
>>>> specific format translation, rather than each SDK. Move the v1 beta3
>>>> pipeline to an internal detail.
>>>>
>>>> Ideally Dataflow would support a JobManagment endpoint directly, but I
>>>> imagine that's a more involved task that's out of scope for now.
>>>>
>>>
>>> Yeah, I think we can just embed the runner API proto in Dataflow job
>>> request (or store it in GCS and Download in router if too large). Then
>>> runner API proto to Dataflow proto translation can occur within Dataflow
>>> service and all SDKs can share that translation logic ((3) below). I agree
>>> that fully migrating Dataflow service to be on job management API seems to
>>> be out of scope.
>>>
>>
>> I've been hoping for that day for a long time now :). I wonder how hard
>> it woud be to extend/embed the existing go translation code into the
>> router.
>>
>>
>>>
>>>
>>>>
>>>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:
>>>>>
>>>>>> Okay cool, so it sounds like the cleanup can be done in two phases:
>>>>>> move the apply_ methods to transform replacements, then move Dataflow onto
>>>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>>>>>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>>>>>> could be passed to the DataflowRunner to run, correct?
>>>>>>
>>>>>
>>>>> Currently we do the following.
>>>>>
>>>>> (1) Currently Java and Python SDKs
>>>>> SDK specific object representation -> Dataflow job request (v1beta3)
>>>>> -> Dataflow service specific representation
>>>>> Beam Runner API proto -> store in GCS -> Download in workers.
>>>>>
>>>>> (2) Currently Go SDK
>>>>> SDK specific object representation -> Beam Runner API proto
>>>>> -> Dataflow job request (v1beta3) -> Dataflow service specific
>>>>> representation
>>>>>
>>>>> We got cross-language (for Python) working for (1) above but code will
>>>>> be much cleaner if we could do (2) for Python and Java
>>>>>
>>>>> I think the cleanest approach is following which will allow us to
>>>>> share translation code across SDKs.
>>>>> (3) For all SDKs
>>>>> SDK specific object representation -> Runner API proto embedded in
>>>>> Dataflow job request -> Runner API proto to internal Dataflow specific
>>>>> representation within Dataflow service
>>>>>
>>>>> I think we should go for a cleaner approach here ((2) or (3)) instead
>>>>> of trying to do it in multiple steps (we'll have to keep updating features
>>>>> such as a cross-language to be in lockstep which will be hard and result in
>>>>> a lot of throwaway work).
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +1 to translation from beam pipeline Protos.
>>>>>>>
>>>>>>>  The Go SDK does that currently in dataflowlib/translate.go to
>>>>>>> handle the current Dataflow situation, so it's certainly doable.
>>>>>>>
>>>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am currently investigating making the Python DataflowRunner to
>>>>>>>>> use a portable pipeline representation so that we can eventually get rid of
>>>>>>>>> the Pipeline(runner) weirdness.
>>>>>>>>>
>>>>>>>>> In that case, I have a lot questions about the Python
>>>>>>>>> DataflowRunner:
>>>>>>>>>
>>>>>>>>> *PValueCache*
>>>>>>>>>
>>>>>>>>>    - Why does this exist?
>>>>>>>>>
>>>>>>>>> This is historical baggage from the (long gone) first direct
>>>>>>>> runner when actual computed PCollections were cached, and the
>>>>>>>> DataflowRunner inherited it.
>>>>>>>>
>>>>>>>>
>>>>>>>>> *DataflowRunner*
>>>>>>>>>
>>>>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>>>>>    steps.
>>>>>>>>>       - How does this add steps?
>>>>>>>>>       - Where does it cache the values to?
>>>>>>>>>       - How does the runner harness pick up these cached values
>>>>>>>>>       to create new steps?
>>>>>>>>>       - How is this information communicated to the runner
>>>>>>>>>       harness?
>>>>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>>>>
>>>>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>>>>
>>>>>>>>> apply_ is called at pipeline construction time, all of these
>>>>>>>> should be replaced by PTransformOverrides. run_ is called after pipeline
>>>>>>>> construction to actually build up the dataflow graph.
>>>>>>>>
>>>>>>>>
>>>>>>>>> *Possible fixes*
>>>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>>>>>> make this feasible?
>>>>>>>>>
>>>>>>>>
>>>>>>>> If we're going to overhaul how the runner works, it would be best
>>>>>>>> to make DataflowRunner direct a translator from Beam runner api protos to
>>>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>>>>> representation (which no one wants to change for fear of messing up
>>>>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Robert Burke <ro...@frantil.com>.
It's stateless translation code and nothing is sourced outside of the beam
pipeline proto, so it should be fairly straightforward code to write and
test.

One can collect several before and afters of the existing translations and
use them to validate.
There are a few quirks that were previously necessary though to get
Dataflow to work properly for the Go SDK, in particular around DoFns
without outputs, but that's reasonably clear in the translator.

On Thu, Apr 2, 2020, 5:57 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <ro...@frantil.com> wrote:
>>
>>> In particular, ideally the Dataflow Service is handling the Dataflow
>>> specific format translation, rather than each SDK. Move the v1 beta3
>>> pipeline to an internal detail.
>>>
>>> Ideally Dataflow would support a JobManagment endpoint directly, but I
>>> imagine that's a more involved task that's out of scope for now.
>>>
>>
>> Yeah, I think we can just embed the runner API proto in Dataflow job
>> request (or store it in GCS and Download in router if too large). Then
>> runner API proto to Dataflow proto translation can occur within Dataflow
>> service and all SDKs can share that translation logic ((3) below). I agree
>> that fully migrating Dataflow service to be on job management API seems to
>> be out of scope.
>>
>
> I've been hoping for that day for a long time now :). I wonder how hard it
> woud be to extend/embed the existing go translation code into the router.
>
>
>>
>>
>>>
>>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:
>>>>
>>>>> Okay cool, so it sounds like the cleanup can be done in two phases:
>>>>> move the apply_ methods to transform replacements, then move Dataflow onto
>>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>>>>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>>>>> could be passed to the DataflowRunner to run, correct?
>>>>>
>>>>
>>>> Currently we do the following.
>>>>
>>>> (1) Currently Java and Python SDKs
>>>> SDK specific object representation -> Dataflow job request (v1beta3) ->
>>>> Dataflow service specific representation
>>>> Beam Runner API proto -> store in GCS -> Download in workers.
>>>>
>>>> (2) Currently Go SDK
>>>> SDK specific object representation -> Beam Runner API proto -> Dataflow
>>>> job request (v1beta3) -> Dataflow service specific representation
>>>>
>>>> We got cross-language (for Python) working for (1) above but code will
>>>> be much cleaner if we could do (2) for Python and Java
>>>>
>>>> I think the cleanest approach is following which will allow us to share
>>>> translation code across SDKs.
>>>> (3) For all SDKs
>>>> SDK specific object representation -> Runner API proto embedded in
>>>> Dataflow job request -> Runner API proto to internal Dataflow specific
>>>> representation within Dataflow service
>>>>
>>>> I think we should go for a cleaner approach here ((2) or (3)) instead
>>>> of trying to do it in multiple steps (we'll have to keep updating features
>>>> such as a cross-language to be in lockstep which will be hard and result in
>>>> a lot of throwaway work).
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com>
>>>>> wrote:
>>>>>
>>>>>> +1 to translation from beam pipeline Protos.
>>>>>>
>>>>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>>>>> the current Dataflow situation, so it's certainly doable.
>>>>>>
>>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am currently investigating making the Python DataflowRunner to
>>>>>>>> use a portable pipeline representation so that we can eventually get rid of
>>>>>>>> the Pipeline(runner) weirdness.
>>>>>>>>
>>>>>>>> In that case, I have a lot questions about the Python
>>>>>>>> DataflowRunner:
>>>>>>>>
>>>>>>>> *PValueCache*
>>>>>>>>
>>>>>>>>    - Why does this exist?
>>>>>>>>
>>>>>>>> This is historical baggage from the (long gone) first direct runner
>>>>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>>>>> inherited it.
>>>>>>>
>>>>>>>
>>>>>>>> *DataflowRunner*
>>>>>>>>
>>>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>>>>    steps.
>>>>>>>>       - How does this add steps?
>>>>>>>>       - Where does it cache the values to?
>>>>>>>>       - How does the runner harness pick up these cached values to
>>>>>>>>       create new steps?
>>>>>>>>       - How is this information communicated to the runner harness?
>>>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>>>
>>>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>>>
>>>>>>>>
>>>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>>>
>>>>>>>> apply_ is called at pipeline construction time, all of these should
>>>>>>> be replaced by PTransformOverrides. run_ is called after pipeline
>>>>>>> construction to actually build up the dataflow graph.
>>>>>>>
>>>>>>>
>>>>>>>> *Possible fixes*
>>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>>>>> make this feasible?
>>>>>>>>
>>>>>>>
>>>>>>> If we're going to overhaul how the runner works, it would be best to
>>>>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>>>> representation (which no one wants to change for fear of messing up
>>>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>>>
>>>>>>>
>>>>>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <ro...@frantil.com> wrote:
>
>> In particular, ideally the Dataflow Service is handling the Dataflow
>> specific format translation, rather than each SDK. Move the v1 beta3
>> pipeline to an internal detail.
>>
>> Ideally Dataflow would support a JobManagment endpoint directly, but I
>> imagine that's a more involved task that's out of scope for now.
>>
>
> Yeah, I think we can just embed the runner API proto in Dataflow job
> request (or store it in GCS and Download in router if too large). Then
> runner API proto to Dataflow proto translation can occur within Dataflow
> service and all SDKs can share that translation logic ((3) below). I agree
> that fully migrating Dataflow service to be on job management API seems to
> be out of scope.
>

I've been hoping for that day for a long time now :). I wonder how hard it
woud be to extend/embed the existing go translation code into the router.


>
>
>>
>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:
>>>
>>>> Okay cool, so it sounds like the cleanup can be done in two phases:
>>>> move the apply_ methods to transform replacements, then move Dataflow onto
>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>>>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>>>> could be passed to the DataflowRunner to run, correct?
>>>>
>>>
>>> Currently we do the following.
>>>
>>> (1) Currently Java and Python SDKs
>>> SDK specific object representation -> Dataflow job request (v1beta3) ->
>>> Dataflow service specific representation
>>> Beam Runner API proto -> store in GCS -> Download in workers.
>>>
>>> (2) Currently Go SDK
>>> SDK specific object representation -> Beam Runner API proto -> Dataflow
>>> job request (v1beta3) -> Dataflow service specific representation
>>>
>>> We got cross-language (for Python) working for (1) above but code will
>>> be much cleaner if we could do (2) for Python and Java
>>>
>>> I think the cleanest approach is following which will allow us to share
>>> translation code across SDKs.
>>> (3) For all SDKs
>>> SDK specific object representation -> Runner API proto embedded in
>>> Dataflow job request -> Runner API proto to internal Dataflow specific
>>> representation within Dataflow service
>>>
>>> I think we should go for a cleaner approach here ((2) or (3)) instead of
>>> trying to do it in multiple steps (we'll have to keep updating features
>>> such as a cross-language to be in lockstep which will be hard and result in
>>> a lot of throwaway work).
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com>
>>>> wrote:
>>>>
>>>>> +1 to translation from beam pipeline Protos.
>>>>>
>>>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>>>> the current Dataflow situation, so it's certainly doable.
>>>>>
>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am currently investigating making the Python DataflowRunner to use
>>>>>>> a portable pipeline representation so that we can eventually get rid of the
>>>>>>> Pipeline(runner) weirdness.
>>>>>>>
>>>>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>>>>
>>>>>>> *PValueCache*
>>>>>>>
>>>>>>>    - Why does this exist?
>>>>>>>
>>>>>>> This is historical baggage from the (long gone) first direct runner
>>>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>>>> inherited it.
>>>>>>
>>>>>>
>>>>>>> *DataflowRunner*
>>>>>>>
>>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>>>    steps.
>>>>>>>       - How does this add steps?
>>>>>>>       - Where does it cache the values to?
>>>>>>>       - How does the runner harness pick up these cached values to
>>>>>>>       create new steps?
>>>>>>>       - How is this information communicated to the runner harness?
>>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>>
>>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>>
>>>>>>>
>>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>>
>>>>>>> apply_ is called at pipeline construction time, all of these should
>>>>>> be replaced by PTransformOverrides. run_ is called after pipeline
>>>>>> construction to actually build up the dataflow graph.
>>>>>>
>>>>>>
>>>>>>> *Possible fixes*
>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>>>> make this feasible?
>>>>>>>
>>>>>>
>>>>>> If we're going to overhaul how the runner works, it would be best to
>>>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>>> representation (which no one wants to change for fear of messing up
>>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>>
>>>>>>
>>>>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Chamikara Jayalath <ch...@google.com>.
On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <ro...@frantil.com> wrote:

> In particular, ideally the Dataflow Service is handling the Dataflow
> specific format translation, rather than each SDK. Move the v1 beta3
> pipeline to an internal detail.
>
> Ideally Dataflow would support a JobManagment endpoint directly, but I
> imagine that's a more involved task that's out of scope for now.
>

Yeah, I think we can just embed the runner API proto in Dataflow job
request (or store it in GCS and Download in router if too large). Then
runner API proto to Dataflow proto translation can occur within Dataflow
service and all SDKs can share that translation logic ((3) below). I agree
that fully migrating Dataflow service to be on job management API seems to
be out of scope.


>
> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:
>>
>>> Okay cool, so it sounds like the cleanup can be done in two phases: move
>>> the apply_ methods to transform replacements, then move Dataflow onto the
>>> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>>> could be passed to the DataflowRunner to run, correct?
>>>
>>
>> Currently we do the following.
>>
>> (1) Currently Java and Python SDKs
>> SDK specific object representation -> Dataflow job request (v1beta3) ->
>> Dataflow service specific representation
>> Beam Runner API proto -> store in GCS -> Download in workers.
>>
>> (2) Currently Go SDK
>> SDK specific object representation -> Beam Runner API proto -> Dataflow
>> job request (v1beta3) -> Dataflow service specific representation
>>
>> We got cross-language (for Python) working for (1) above but code will be
>> much cleaner if we could do (2) for Python and Java
>>
>> I think the cleanest approach is following which will allow us to share
>> translation code across SDKs.
>> (3) For all SDKs
>> SDK specific object representation -> Runner API proto embedded in
>> Dataflow job request -> Runner API proto to internal Dataflow specific
>> representation within Dataflow service
>>
>> I think we should go for a cleaner approach here ((2) or (3)) instead of
>> trying to do it in multiple steps (we'll have to keep updating features
>> such as a cross-language to be in lockstep which will be hard and result in
>> a lot of throwaway work).
>>
>> Thanks,
>> Cham
>>
>>
>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com> wrote:
>>>
>>>> +1 to translation from beam pipeline Protos.
>>>>
>>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>>> the current Dataflow situation, so it's certainly doable.
>>>>
>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am currently investigating making the Python DataflowRunner to use
>>>>>> a portable pipeline representation so that we can eventually get rid of the
>>>>>> Pipeline(runner) weirdness.
>>>>>>
>>>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>>>
>>>>>> *PValueCache*
>>>>>>
>>>>>>    - Why does this exist?
>>>>>>
>>>>>> This is historical baggage from the (long gone) first direct runner
>>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>>> inherited it.
>>>>>
>>>>>
>>>>>> *DataflowRunner*
>>>>>>
>>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>>    steps.
>>>>>>       - How does this add steps?
>>>>>>       - Where does it cache the values to?
>>>>>>       - How does the runner harness pick up these cached values to
>>>>>>       create new steps?
>>>>>>       - How is this information communicated to the runner harness?
>>>>>>    - Why do the following transforms need to be overridden:
>>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>>
>>>>>> Each of these four has a different implementation on Dataflow.
>>>>>
>>>>>>
>>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>>    that it has a run_ method but no apply_ method.
>>>>>>
>>>>>> apply_ is called at pipeline construction time, all of these should
>>>>> be replaced by PTransformOverrides. run_ is called after pipeline
>>>>> construction to actually build up the dataflow graph.
>>>>>
>>>>>
>>>>>> *Possible fixes*
>>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>>> make this feasible?
>>>>>>
>>>>>
>>>>> If we're going to overhaul how the runner works, it would be best to
>>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>>> representation (which no one wants to change for fear of messing up
>>>>> DataflowRunner and cause headaches for cross langauge).
>>>>>
>>>>>
>>>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Robert Burke <ro...@frantil.com>.
In particular, ideally the Dataflow Service is handling the Dataflow
specific format translation, rather than each SDK. Move the v1 beta3
pipeline to an internal detail.

Ideally Dataflow would support a JobManagment endpoint directly, but I
imagine that's a more involved task that's out of scope for now.

On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:
>
>> Okay cool, so it sounds like the cleanup can be done in two phases: move
>> the apply_ methods to transform replacements, then move Dataflow onto the
>> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
>> portable? If the InteractiveRunner were to make a Pipeline object, then it
>> could be passed to the DataflowRunner to run, correct?
>>
>
> Currently we do the following.
>
> (1) Currently Java and Python SDKs
> SDK specific object representation -> Dataflow job request (v1beta3) ->
> Dataflow service specific representation
> Beam Runner API proto -> store in GCS -> Download in workers.
>
> (2) Currently Go SDK
> SDK specific object representation -> Beam Runner API proto -> Dataflow
> job request (v1beta3) -> Dataflow service specific representation
>
> We got cross-language (for Python) working for (1) above but code will be
> much cleaner if we could do (2) for Python and Java
>
> I think the cleanest approach is following which will allow us to share
> translation code across SDKs.
> (3) For all SDKs
> SDK specific object representation -> Runner API proto embedded in
> Dataflow job request -> Runner API proto to internal Dataflow specific
> representation within Dataflow service
>
> I think we should go for a cleaner approach here ((2) or (3)) instead of
> trying to do it in multiple steps (we'll have to keep updating features
> such as a cross-language to be in lockstep which will be hard and result in
> a lot of throwaway work).
>
> Thanks,
> Cham
>
>
>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com> wrote:
>>
>>> +1 to translation from beam pipeline Protos.
>>>
>>>  The Go SDK does that currently in dataflowlib/translate.go to handle
>>> the current Dataflow situation, so it's certainly doable.
>>>
>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am currently investigating making the Python DataflowRunner to use a
>>>>> portable pipeline representation so that we can eventually get rid of the
>>>>> Pipeline(runner) weirdness.
>>>>>
>>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>>
>>>>> *PValueCache*
>>>>>
>>>>>    - Why does this exist?
>>>>>
>>>>> This is historical baggage from the (long gone) first direct runner
>>>> when actual computed PCollections were cached, and the DataflowRunner
>>>> inherited it.
>>>>
>>>>
>>>>> *DataflowRunner*
>>>>>
>>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>>    steps.
>>>>>       - How does this add steps?
>>>>>       - Where does it cache the values to?
>>>>>       - How does the runner harness pick up these cached values to
>>>>>       create new steps?
>>>>>       - How is this information communicated to the runner harness?
>>>>>    - Why do the following transforms need to be overridden:
>>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>>
>>>>> Each of these four has a different implementation on Dataflow.
>>>>
>>>>>
>>>>>    - Why doesn't the ParDo transform need to be overridden? I see
>>>>>    that it has a run_ method but no apply_ method.
>>>>>
>>>>> apply_ is called at pipeline construction time, all of these should be
>>>> replaced by PTransformOverrides. run_ is called after pipeline construction
>>>> to actually build up the dataflow graph.
>>>>
>>>>
>>>>> *Possible fixes*
>>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>>> make this feasible?
>>>>>
>>>>
>>>> If we're going to overhaul how the runner works, it would be best to
>>>> make DataflowRunner direct a translator from Beam runner api protos to
>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>>> representation (which no one wants to change for fear of messing up
>>>> DataflowRunner and cause headaches for cross langauge).
>>>>
>>>>
>>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <sr...@google.com> wrote:

> Okay cool, so it sounds like the cleanup can be done in two phases: move
> the apply_ methods to transform replacements, then move Dataflow onto the
> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
> portable? If the InteractiveRunner were to make a Pipeline object, then it
> could be passed to the DataflowRunner to run, correct?
>

Currently we do the following.

(1) Currently Java and Python SDKs
SDK specific object representation -> Dataflow job request (v1beta3) ->
Dataflow service specific representation
Beam Runner API proto -> store in GCS -> Download in workers.

(2) Currently Go SDK
SDK specific object representation -> Beam Runner API proto -> Dataflow job
request (v1beta3) -> Dataflow service specific representation

We got cross-language (for Python) working for (1) above but code will be
much cleaner if we could do (2) for Python and Java

I think the cleanest approach is following which will allow us to share
translation code across SDKs.
(3) For all SDKs
SDK specific object representation -> Runner API proto embedded in Dataflow
job request -> Runner API proto to internal Dataflow specific
representation within Dataflow service

I think we should go for a cleaner approach here ((2) or (3)) instead of
trying to do it in multiple steps (we'll have to keep updating features
such as a cross-language to be in lockstep which will be hard and result in
a lot of throwaway work).

Thanks,
Cham


> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com> wrote:
>
>> +1 to translation from beam pipeline Protos.
>>
>>  The Go SDK does that currently in dataflowlib/translate.go to handle the
>> current Dataflow situation, so it's certainly doable.
>>
>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am currently investigating making the Python DataflowRunner to use a
>>>> portable pipeline representation so that we can eventually get rid of the
>>>> Pipeline(runner) weirdness.
>>>>
>>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>>
>>>> *PValueCache*
>>>>
>>>>    - Why does this exist?
>>>>
>>>> This is historical baggage from the (long gone) first direct runner
>>> when actual computed PCollections were cached, and the DataflowRunner
>>> inherited it.
>>>
>>>
>>>> *DataflowRunner*
>>>>
>>>>    - I see that the DataflowRunner defines some PTransforms as
>>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>>    steps.
>>>>       - How does this add steps?
>>>>       - Where does it cache the values to?
>>>>       - How does the runner harness pick up these cached values to
>>>>       create new steps?
>>>>       - How is this information communicated to the runner harness?
>>>>    - Why do the following transforms need to be overridden:
>>>>    GroupByKey, WriteToBigQuery, CombineValues, Read?
>>>>
>>>> Each of these four has a different implementation on Dataflow.
>>>
>>>>
>>>>    - Why doesn't the ParDo transform need to be overridden? I see that
>>>>    it has a run_ method but no apply_ method.
>>>>
>>>> apply_ is called at pipeline construction time, all of these should be
>>> replaced by PTransformOverrides. run_ is called after pipeline construction
>>> to actually build up the dataflow graph.
>>>
>>>
>>>> *Possible fixes*
>>>> I was thinking of getting rid of the apply_ and run_ methods and
>>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>>> make this feasible?
>>>>
>>>
>>> If we're going to overhaul how the runner works, it would be best to
>>> make DataflowRunner direct a translator from Beam runner api protos to
>>> Cloudv1b3 protos, rather than manipulate the intermediate Python
>>> representation (which no one wants to change for fear of messing up
>>> DataflowRunner and cause headaches for cross langauge).
>>>
>>>
>>>

Re: Unportable Dataflow Pipeline Questions

Posted by Sam Rohde <sr...@google.com>.
Okay cool, so it sounds like the cleanup can be done in two phases: move
the apply_ methods to transform replacements, then move Dataflow onto the
Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
portable? If the InteractiveRunner were to make a Pipeline object, then it
could be passed to the DataflowRunner to run, correct?

On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <ro...@frantil.com> wrote:

> +1 to translation from beam pipeline Protos.
>
>  The Go SDK does that currently in dataflowlib/translate.go to handle the
> current Dataflow situation, so it's certainly doable.
>
> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com> wrote:
>
>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>>
>>> Hi All,
>>>
>>> I am currently investigating making the Python DataflowRunner to use a
>>> portable pipeline representation so that we can eventually get rid of the
>>> Pipeline(runner) weirdness.
>>>
>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>
>>> *PValueCache*
>>>
>>>    - Why does this exist?
>>>
>>> This is historical baggage from the (long gone) first direct runner when
>> actual computed PCollections were cached, and the DataflowRunner inherited
>> it.
>>
>>
>>> *DataflowRunner*
>>>
>>>    - I see that the DataflowRunner defines some PTransforms as
>>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>>    methods. Then in the run_ methods, it references the PValueCache to add
>>>    steps.
>>>       - How does this add steps?
>>>       - Where does it cache the values to?
>>>       - How does the runner harness pick up these cached values to
>>>       create new steps?
>>>       - How is this information communicated to the runner harness?
>>>    - Why do the following transforms need to be overridden: GroupByKey,
>>>    WriteToBigQuery, CombineValues, Read?
>>>
>>> Each of these four has a different implementation on Dataflow.
>>
>>>
>>>    - Why doesn't the ParDo transform need to be overridden? I see that
>>>    it has a run_ method but no apply_ method.
>>>
>>> apply_ is called at pipeline construction time, all of these should be
>> replaced by PTransformOverrides. run_ is called after pipeline construction
>> to actually build up the dataflow graph.
>>
>>
>>> *Possible fixes*
>>> I was thinking of getting rid of the apply_ and run_ methods and
>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>> make this feasible?
>>>
>>
>> If we're going to overhaul how the runner works, it would be best to make
>> DataflowRunner direct a translator from Beam runner api protos to Cloudv1b3
>> protos, rather than manipulate the intermediate Python representation
>> (which no one wants to change for fear of messing up DataflowRunner and
>> cause headaches for cross langauge).
>>
>>
>>

Re: Unportable Dataflow Pipeline Questions

Posted by Robert Burke <ro...@frantil.com>.
+1 to translation from beam pipeline Protos.

 The Go SDK does that currently in dataflowlib/translate.go to handle the
current Dataflow situation, so it's certainly doable.

On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <sr...@google.com> wrote:
>
>> Hi All,
>>
>> I am currently investigating making the Python DataflowRunner to use a
>> portable pipeline representation so that we can eventually get rid of the
>> Pipeline(runner) weirdness.
>>
>> In that case, I have a lot questions about the Python DataflowRunner:
>>
>> *PValueCache*
>>
>>    - Why does this exist?
>>
>> This is historical baggage from the (long gone) first direct runner when
> actual computed PCollections were cached, and the DataflowRunner inherited
> it.
>
>
>> *DataflowRunner*
>>
>>    - I see that the DataflowRunner defines some PTransforms as
>>    runner-specific primitives by returning a PCollection.from_(...) in apply_
>>    methods. Then in the run_ methods, it references the PValueCache to add
>>    steps.
>>       - How does this add steps?
>>       - Where does it cache the values to?
>>       - How does the runner harness pick up these cached values to
>>       create new steps?
>>       - How is this information communicated to the runner harness?
>>    - Why do the following transforms need to be overridden: GroupByKey,
>>    WriteToBigQuery, CombineValues, Read?
>>
>> Each of these four has a different implementation on Dataflow.
>
>>
>>    - Why doesn't the ParDo transform need to be overridden? I see that
>>    it has a run_ method but no apply_ method.
>>
>> apply_ is called at pipeline construction time, all of these should be
> replaced by PTransformOverrides. run_ is called after pipeline construction
> to actually build up the dataflow graph.
>
>
>> *Possible fixes*
>> I was thinking of getting rid of the apply_ and run_ methods and
>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>> respectively. Is this feasible? Am I missing any assumptions that don't
>> make this feasible?
>>
>
> If we're going to overhaul how the runner works, it would be best to make
> DataflowRunner direct a translator from Beam runner api protos to Cloudv1b3
> protos, rather than manipulate the intermediate Python representation
> (which no one wants to change for fear of messing up DataflowRunner and
> cause headaches for cross langauge).
>
>
>