You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Harrison Green <hg...@google.com> on 2020/08/04 23:59:00 UTC
Returning multiple PCollections from a PTransform
Hi all,
I've run into a situation where I would like to return two PCollections
during a PTransform. I am aware of the ParDo.with_outputs construct but in
this case, the PCollections are the flattened results of several other
transforms and it would be cleaner to just return multiple PCollections in
a tuple.
I've tested this out with the following snippet and it seems to work (at
least on the direct runner):
---
import apache_beam as beam
@beam.ptransform_fn
def test(pcoll):
a = pcoll | '1' >> beam.Map(lambda x: x+1)
b = pcoll | '2' >> beam.Map(lambda x: x+10)
return (a,b)
with beam.Pipeline() as p:
c = p | beam.Create(list(range(10)))
a,b = c | test()
a | 'a' >> beam.Map(lambda x: print('a %d' % x))
b | 'b' >> beam.Map(lambda x: print('b %d' % x))
---
I'm curious if this type of pipeline construction is well-supported and if
I will run into any issues on other runners.
Thanks!
- Harrison
Re: Returning multiple PCollections from a PTransform
Posted by Robert Bradshaw <ro...@google.com>.
Yes, this is explicitly supported. You can return named tuples and
dictionaries (with PCollections as values) as well.
On Tue, Aug 4, 2020 at 5:00 PM Harrison Green <hg...@google.com> wrote:
>
> Hi all,
>
> I've run into a situation where I would like to return two PCollections during a PTransform. I am aware of the ParDo.with_outputs construct but in this case, the PCollections are the flattened results of several other transforms and it would be cleaner to just return multiple PCollections in a tuple.
>
> I've tested this out with the following snippet and it seems to work (at least on the direct runner):
>
> ---
> import apache_beam as beam
>
> @beam.ptransform_fn
> def test(pcoll):
> a = pcoll | '1' >> beam.Map(lambda x: x+1)
> b = pcoll | '2' >> beam.Map(lambda x: x+10)
>
> return (a,b)
>
> with beam.Pipeline() as p:
> c = p | beam.Create(list(range(10)))
>
> a,b = c | test()
>
> a | 'a' >> beam.Map(lambda x: print('a %d' % x))
> b | 'b' >> beam.Map(lambda x: print('b %d' % x))
> ---
>
> I'm curious if this type of pipeline construction is well-supported and if I will run into any issues on other runners.
>
> Thanks!
> - Harrison