You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Stephan Hoyer via dev <de...@beam.apache.org> on 2022/09/19 15:00:00 UTC

Cartesian product of PCollections

I'm wondering if it would make sense to have a built-in Beam transformation
for calculating the Cartesian product of PCollections.

Just this past week, I've encountered two separate cases where calculating
a Cartesian product was a bottleneck. The in-memory option of using
something like Python's itertools.product() is convenient, but it only
scales to a single node.

Unfortunately, implementing a scalable Cartesian product seems to be
somewhat non-trivial. I found two version of this question on
StackOverflow, but neither contains a code solution:
https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections
https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/

There's a fair amount of nuance in an efficient and scalable
implementation. My team has an internal implementation of a
CartesianProduct transform, based on using hashing to split a pcollection
into a finite number of groups and CoGroupByKey. On the other hand, if any
of the input pcollections are small, using side inputs would probably be
the way to go to avoid the need for a shuffle.

Any thoughts?

Cheers,
Stephan

Re: Cartesian product of PCollections

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
On Mon, Sep 19, 2022 at 1:53 PM Stephan Hoyer <sh...@google.com> wrote:
>>
>> > > My team has an internal implementation of a CartesianProduct transform, based on using hashing to split a pcollection into a finite number of groups and CoGroupByKey.
>> >
>> > Could this be contributed to Beam?
>
>
> If it would be of broader interest, I would be happy to work on this for the Python SDK.
>
> I can share a link to the code with Googlers.
>
> On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> If one of your inputs fits into memory, using side inputs is
>> definitely the way to go. If neither side fits into memory, the cross
>> product may be prohibitively large to compute even on a distributed
>> computing platform (a billion times a billion is big, though I suppose
>> one may hit memory limits with fewer elements if the elements
>> themselves are large)
>
> I agree, in practice the side input solution will usually suffice.
>
> For CartesianProduct in particular, it is pretty common for one or more of the inputs to have a statically known size, because it was created from an in-memory sequence (i.e., with beam.Create). Otherwise we could look at user-supplied hints, falling back to CoGroupByKey only if required.
>
> There is also the (not uncommon) special case where _every_ input has statically known size, e.g., CreateCartesianProduct().
>
>> one can still do the partitioning hack. E.g.
>>
>> partitions = pcoll_B | beam.Partition(hash, N)
>> cross_product = tuple([
>>   pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs],
>> beam.pvalue.AsList(part))
>>   for part in partitions
>> ]) | beam.Flatten()
>
>
> Interesting! I imagine this would break at some scale. Do you have an intuition for what is a "reasonable" number of partitions -- 10s, 100s, 1000s?

One would want the minimum number of partitions that allows each to
fit into memory. N partitions will result in N distinct reads of
pcoll_A. Thousands of partitions, if your data requires it, would not
pose an issue (other than being able to allocate/afford enough
resources to process such a large cross product, even at google
scale).

Re: Cartesian product of PCollections

Posted by Stephan Hoyer via dev <de...@beam.apache.org>.
>
> > > My team has an internal implementation of a CartesianProduct
> transform, based on using hashing to split a pcollection into a finite
> number of groups and CoGroupByKey.
> >
> > Could this be contributed to Beam?
>

If it would be of broader interest, I would be happy to work on this for
the Python SDK.

I can share a link to the code with Googlers.

On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw <ro...@google.com>
wrote:

> If one of your inputs fits into memory, using side inputs is
> definitely the way to go. If neither side fits into memory, the cross
> product may be prohibitively large to compute even on a distributed
> computing platform (a billion times a billion is big, though I suppose
> one may hit memory limits with fewer elements if the elements
> themselves are large)


I agree, in practice the side input solution will usually suffice.

For CartesianProduct in particular, it is pretty common for one or more of
the inputs to have a statically known size, because it was created from an
in-memory sequence (i.e., with beam.Create). Otherwise we could look at
user-supplied hints, falling back to CoGroupByKey only if required.

There is also the (not uncommon) special case where _every_ input has
statically known size, e.g., CreateCartesianProduct().

one can still do the partitioning hack. E.g.
>
> partitions = pcoll_B | beam.Partition(hash, N)
> cross_product = tuple([
>   pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs],
> beam.pvalue.AsList(part))
>   for part in partitions
> ]) | beam.Flatten()


Interesting! I imagine this would break at some scale. Do you have an
intuition for what is a "reasonable" number of partitions -- 10s, 100s,
1000s?

Re: Cartesian product of PCollections

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
If one of your inputs fits into memory, using side inputs is
definitely the way to go. If neither side fits into memory, the cross
product may be prohibitively large to compute even on a distributed
computing platform (a billion times a billion is big, though I suppose
one may hit memory limits with fewer elements if the elements
themselves are large) but one can still do the partitioning hack. E.g.

partitions = pcoll_B | beam.Partition(hash, N)
cross_product = tuple([
  pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs],
beam.pvalue.AsList(part))
  for part in partitions
]) | beam.Flatten()

One may need to break fusion before the FlatMap to avoid trying to
pull all parts into memory, e.g.

class FusionBreak(beam.PTransform()):
  def expand(self, pcoll):
    empty = pcoll | beam.FlatMap(lambda x: ())
    return pcoll | beam.Map(lambda x, _: x, beam.pvalue.AsList(empty))

On Mon, Sep 19, 2022 at 8:34 AM Brian Hulette via dev
<de...@beam.apache.org> wrote:
>
> In SQL we just don't support cross joins currently [1]. I'm not aware of an existing implementation of a cross join/cartesian product.
>
> > My team has an internal implementation of a CartesianProduct transform, based on using hashing to split a pcollection into a finite number of groups and CoGroupByKey.
>
> Could this be contributed to Beam?
>
> > On the other hand, if any of the input pcollections are small, using side inputs would probably be the way to go to avoid the need for a shuffle.
>
> We run into this problem frequently in Beam SQL. Our optimizer could be much more effective with accurate size estimates, but we rarely have them, and they may never be good enough for us to select a side input implementation over CoGroupByKey. I've had some offline discussions in this space, the best solution we've come up with is to allow hints in SQL (or just arguments in join transforms) that allow users to select a side input implementation. We could also add some logging when a pipeline uses a CoGroupByKey and PCollection sizes could be handled by a side input implementation, to nudge users that way for future runs.
>
> Brian
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
>
> On Mon, Sep 19, 2022 at 8:01 AM Stephan Hoyer via dev <de...@beam.apache.org> wrote:
>>
>> I'm wondering if it would make sense to have a built-in Beam transformation for calculating the Cartesian product of PCollections.
>>
>> Just this past week, I've encountered two separate cases where calculating a Cartesian product was a bottleneck. The in-memory option of using something like Python's itertools.product() is convenient, but it only scales to a single node.
>>
>> Unfortunately, implementing a scalable Cartesian product seems to be somewhat non-trivial. I found two version of this question on StackOverflow, but neither contains a code solution:
>> https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections
>> https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/
>>
>> There's a fair amount of nuance in an efficient and scalable implementation. My team has an internal implementation of a CartesianProduct transform, based on using hashing to split a pcollection into a finite number of groups and CoGroupByKey. On the other hand, if any of the input pcollections are small, using side inputs would probably be the way to go to avoid the need for a shuffle.
>>
>> Any thoughts?
>>
>> Cheers,
>> Stephan

Re: Cartesian product of PCollections

Posted by Brian Hulette via dev <de...@beam.apache.org>.
In SQL we just don't support cross joins currently [1]. I'm not aware of an
existing implementation of a cross join/cartesian product.

> My team has an internal implementation of a CartesianProduct transform,
based on using hashing to split a pcollection into a finite number of
groups and CoGroupByKey.

Could this be contributed to Beam?

> On the other hand, if any of the input pcollections are small, using side
inputs would probably be the way to go to avoid the need for a shuffle.

We run into this problem frequently in Beam SQL. Our optimizer could be
much more effective with accurate size estimates, but we rarely have
them, and they may never be good enough for us to select a side input
implementation over CoGroupByKey. I've had some offline discussions in this
space, the best solution we've come up with is to allow hints in SQL (or
just arguments in join transforms) that allow users to select a side input
implementation. We could also add some logging when a pipeline uses a
CoGroupByKey and PCollection sizes could be handled by a side input
implementation, to nudge users that way for future runs.

Brian

[1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/

On Mon, Sep 19, 2022 at 8:01 AM Stephan Hoyer via dev <de...@beam.apache.org>
wrote:

> I'm wondering if it would make sense to have a built-in Beam
> transformation for calculating the Cartesian product of PCollections.
>
> Just this past week, I've encountered two separate cases where calculating
> a Cartesian product was a bottleneck. The in-memory option of using
> something like Python's itertools.product() is convenient, but it only
> scales to a single node.
>
> Unfortunately, implementing a scalable Cartesian product seems to be
> somewhat non-trivial. I found two version of this question on
> StackOverflow, but neither contains a code solution:
>
> https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections
>
> https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/
>
> There's a fair amount of nuance in an efficient and scalable
> implementation. My team has an internal implementation of a
> CartesianProduct transform, based on using hashing to split a pcollection
> into a finite number of groups and CoGroupByKey. On the other hand, if any
> of the input pcollections are small, using side inputs would probably be
> the way to go to avoid the need for a shuffle.
>
> Any thoughts?
>
> Cheers,
> Stephan
>