You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Harrison Green <hg...@google.com> on 2020/08/04 23:59:00 UTC

Returning multiple PCollections from a PTransform

Hi all,

I've run into a situation where I would like to return two PCollections
during a PTransform. I am aware of the ParDo.with_outputs construct but in
this case, the PCollections are the flattened results of several other
transforms and it would be cleaner to just return multiple PCollections in
a tuple.

I've tested this out with the following snippet and it seems to work (at
least on the direct runner):

---
import apache_beam as beam

@beam.ptransform_fn
def test(pcoll):
    a = pcoll | '1' >> beam.Map(lambda x: x+1)
    b = pcoll | '2' >> beam.Map(lambda x: x+10)

    return (a,b)

with beam.Pipeline() as p:
    c = p | beam.Create(list(range(10)))

    a,b = c | test()

    a | 'a' >> beam.Map(lambda x: print('a %d' % x))
    b | 'b' >> beam.Map(lambda x: print('b %d' % x))
---

I'm curious if this type of pipeline construction is well-supported and if
I will run into any issues on other runners.

Thanks!
- Harrison

Re: Returning multiple PCollections from a PTransform

Posted by Robert Bradshaw <ro...@google.com>.
Yes, this is explicitly supported. You can return named tuples and
dictionaries (with PCollections as values) as well.

On Tue, Aug 4, 2020 at 5:00 PM Harrison Green <hg...@google.com> wrote:
>
> Hi all,
>
> I've run into a situation where I would like to return two PCollections during a PTransform. I am aware of the ParDo.with_outputs construct but in this case, the PCollections are the flattened results of several other transforms and it would be cleaner to just return multiple PCollections in a tuple.
>
> I've tested this out with the following snippet and it seems to work (at least on the direct runner):
>
> ---
> import apache_beam as beam
>
> @beam.ptransform_fn
> def test(pcoll):
>     a = pcoll | '1' >> beam.Map(lambda x: x+1)
>     b = pcoll | '2' >> beam.Map(lambda x: x+10)
>
>     return (a,b)
>
> with beam.Pipeline() as p:
>     c = p | beam.Create(list(range(10)))
>
>     a,b = c | test()
>
>     a | 'a' >> beam.Map(lambda x: print('a %d' % x))
>     b | 'b' >> beam.Map(lambda x: print('b %d' % x))
> ---
>
> I'm curious if this type of pipeline construction is well-supported and if I will run into any issues on other runners.
>
> Thanks!
> - Harrison