You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ke Wu <ke...@gmail.com> on 2021/07/27 00:27:05 UTC

Translates composite transform in portable pipeline

Hello All,

I noticed that Flink/Spark/Samza runners are translating portable pipeline in the similar manner:
> QueryablePipeline p =
>     QueryablePipeline.forTransforms(
>         pipeline.getRootTransformIdsList(), pipeline.getComponents());
> 
> for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) {
>   // Translation logic
> }
However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. composite transforms are NOT being translated at all. 

Is this the expected behavior for runner to implement translation logic for portable pipeline? If Yes, what are the suggestions if certain runners need to translate composite transforms?

Best,
Ke

Re: Translates composite transform in portable pipeline

Posted by Ke Wu <ke...@gmail.com>.
Thank you Robert for the response. 

After some more digging, I realized that Flink/Spark runner is translating composite transforms with

// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
    TrivialNativeTransformExpander.forKnownUrns(
        pipelineWithSdfExpanded, translator.knownUrns());
which will mark composite transform whose urn is in translator list primitive, i.e, leaf node, therefore the traverse latter will be able to return it to be translated.

I believe this is a step that is currently missing in the Samza runner which makes the composite transform translation not working properly. 

Since I am working on Samza runner, I will post a PR to fix it.

Best,
Ke
 

> On Jul 26, 2021, at 5:45 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> You can think of composite transforms like subroutines--they're useful
> concepts for representing the logical structure of the pipeline, but
> for the purposes of execution it is just as valid to inline them all
> as a single monolithic function/pipeline composed of nothing but
> primitive calls. Flink/Spark/Samza have no native notion of composite
> transforms, so this is what they do. If you can preserve the more rich
> structure that has advantages (e.g. for monitoring, debugging, rolling
> up counters and messages, visualizing the pipeline).
> 
> There is one other important case for composites that runners may want
> to take advantage of: runners may recognize higher-level transforms
> and substitute their own (equivalent, of course) implementations. The
> prototypical example of this is combiner lifting, where CombinePerKey
> is naively implemented as GroupByKey + CombineAllValuesDoFn, but most
> runners have more sophisticated ways of handling associative,
> commutative CombineFn aggregations (See
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260
> )
> 
> - Robert
> 
> On Mon, Jul 26, 2021 at 5:27 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> Hello All,
>> 
>> I noticed that Flink/Spark/Samza runners are translating portable pipeline in the similar manner:
>> 
>> QueryablePipeline p =
>>    QueryablePipeline.forTransforms(
>>        pipeline.getRootTransformIdsList(), pipeline.getComponents());
>> 
>> for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) {
>>  // Translation logic
>> }
>> 
>> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. composite transforms are NOT being translated at all.
>> 
>> Is this the expected behavior for runner to implement translation logic for portable pipeline? If Yes, what are the suggestions if certain runners need to translate composite transforms?
>> 
>> Best,
>> Ke


Re: Translates composite transform in portable pipeline

Posted by Robert Bradshaw <ro...@google.com>.
You can think of composite transforms like subroutines--they're useful
concepts for representing the logical structure of the pipeline, but
for the purposes of execution it is just as valid to inline them all
as a single monolithic function/pipeline composed of nothing but
primitive calls. Flink/Spark/Samza have no native notion of composite
transforms, so this is what they do. If you can preserve the more rich
structure that has advantages (e.g. for monitoring, debugging, rolling
up counters and messages, visualizing the pipeline).

There is one other important case for composites that runners may want
to take advantage of: runners may recognize higher-level transforms
and substitute their own (equivalent, of course) implementations. The
prototypical example of this is combiner lifting, where CombinePerKey
is naively implemented as GroupByKey + CombineAllValuesDoFn, but most
runners have more sophisticated ways of handling associative,
commutative CombineFn aggregations (See
https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260
)

- Robert

On Mon, Jul 26, 2021 at 5:27 PM Ke Wu <ke...@gmail.com> wrote:
>
> Hello All,
>
> I noticed that Flink/Spark/Samza runners are translating portable pipeline in the similar manner:
>
> QueryablePipeline p =
>     QueryablePipeline.forTransforms(
>         pipeline.getRootTransformIdsList(), pipeline.getComponents());
>
> for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) {
>   // Translation logic
> }
>
> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. composite transforms are NOT being translated at all.
>
> Is this the expected behavior for runner to implement translation logic for portable pipeline? If Yes, what are the suggestions if certain runners need to translate composite transforms?
>
> Best,
> Ke