You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2020/05/12 14:05:04 UTC

Parallelism in Combine.GroupedValues

Hi,

In the Javadoc for Combine.GroupedValues[1], it has been described
that *combining
the values associated with a single key can happen in parallel*.
The logic to combine values associated with a key can be provided by
CombineFnWithContext (or) CombineFn.
Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a single
accumulator to combine the values.

My understanding is that the parallelism in Combine PTransform will be
determined by the no. of accumulators. But, the Javadoc describes that
combining is done in parallel even though the no. of accumulators used to
combine is one.

How can combine happen parallelly by using only one accumulator?

Regards,
Rahul

[1]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
[2]:
https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
[3]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443

Re: Parallelism in Combine.GroupedValues

Posted by Luke Cwik <lc...@google.com>.
There isn't just one accumulator. There are multiple accumulators that are
used to support a parallel combine. Feel free to open up a PR to help
improve the javadoc.

Yes, these combines are converted to a lifted combine when the runner is
able to do so.

On Tue, May 12, 2020 at 9:14 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Luke,
>
> I should have been more clear with my question. Sorry, my bad.
>
> I wanted to ask: How can combine happen parallelly by using only *one
> accumulator instance*?
>
> It has been explicitly specified in CombineFn.apply()[4] that
> mergeAccumulators() will not be called. A single accumulator instance is
> created and used to combine all the values associated with a single key.
> As Combine.GroupedValues use CombineFn.apply(), the parallelism for
> combining the values of a key will only be one.
> And as Combine.perKey() uses Combine.GroupedValues to combine the values
> of a key, the parallelism of this combine will also be limited to one.
>
> Do you mean to say that these types of Combine will be translated to
> Lifted Combine(assuming no side inputs are provided) depending on the
> runner?
>
> Regards,
> Rahul
>
> [4]:
> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L441
>
> On Tue, May 12, 2020 at 8:35 PM Luke Cwik <lc...@google.com> wrote:
>
>> There is more than one instance of an accumulator being used and then
>> those accumulators are merged using mergeAccumulators method.
>>
>> Two examples of when combining happens in parallel is when the
>> withFewKeys hint is used on the combiner or when there is partial
>> combining[1] happening on the mapper side before the grouping operation.
>>
>> 1: https://s.apache.org/beam-runner-api-combine-model
>>
>> On Tue, May 12, 2020 at 7:05 AM rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> In the Javadoc for Combine.GroupedValues[1], it has been described that *combining
>>> the values associated with a single key can happen in parallel*.
>>> The logic to combine values associated with a key can be provided by
>>> CombineFnWithContext (or) CombineFn.
>>> Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a
>>> single accumulator to combine the values.
>>>
>>> My understanding is that the parallelism in Combine PTransform will be
>>> determined by the no. of accumulators. But, the Javadoc describes that
>>> combining is done in parallel even though the no. of accumulators used to
>>> combine is one.
>>>
>>> How can combine happen parallelly by using only one accumulator?
>>>
>>> Regards,
>>> Rahul
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
>>> [2]:
>>> https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
>>> [3]:
>>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443
>>>
>>

Re: Parallelism in Combine.GroupedValues

Posted by rahul patwari <ra...@gmail.com>.
Hi Luke,

I should have been more clear with my question. Sorry, my bad.

I wanted to ask: How can combine happen parallelly by using only *one
accumulator instance*?

It has been explicitly specified in CombineFn.apply()[4] that
mergeAccumulators() will not be called. A single accumulator instance is
created and used to combine all the values associated with a single key.
As Combine.GroupedValues use CombineFn.apply(), the parallelism for
combining the values of a key will only be one.
And as Combine.perKey() uses Combine.GroupedValues to combine the values of
a key, the parallelism of this combine will also be limited to one.

Do you mean to say that these types of Combine will be translated to Lifted
Combine(assuming no side inputs are provided) depending on the runner?

Regards,
Rahul

[4]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L441

On Tue, May 12, 2020 at 8:35 PM Luke Cwik <lc...@google.com> wrote:

> There is more than one instance of an accumulator being used and then
> those accumulators are merged using mergeAccumulators method.
>
> Two examples of when combining happens in parallel is when the withFewKeys
> hint is used on the combiner or when there is partial combining[1]
> happening on the mapper side before the grouping operation.
>
> 1: https://s.apache.org/beam-runner-api-combine-model
>
> On Tue, May 12, 2020 at 7:05 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> In the Javadoc for Combine.GroupedValues[1], it has been described that *combining
>> the values associated with a single key can happen in parallel*.
>> The logic to combine values associated with a key can be provided by
>> CombineFnWithContext (or) CombineFn.
>> Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a
>> single accumulator to combine the values.
>>
>> My understanding is that the parallelism in Combine PTransform will be
>> determined by the no. of accumulators. But, the Javadoc describes that
>> combining is done in parallel even though the no. of accumulators used to
>> combine is one.
>>
>> How can combine happen parallelly by using only one accumulator?
>>
>> Regards,
>> Rahul
>>
>> [1]:
>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
>> [2]:
>> https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
>> [3]:
>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443
>>
>

Re: Parallelism in Combine.GroupedValues

Posted by Luke Cwik <lc...@google.com>.
There is more than one instance of an accumulator being used and then those
accumulators are merged using mergeAccumulators method.

Two examples of when combining happens in parallel is when the withFewKeys
hint is used on the combiner or when there is partial combining[1]
happening on the mapper side before the grouping operation.

1: https://s.apache.org/beam-runner-api-combine-model

On Tue, May 12, 2020 at 7:05 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> In the Javadoc for Combine.GroupedValues[1], it has been described that *combining
> the values associated with a single key can happen in parallel*.
> The logic to combine values associated with a key can be provided by
> CombineFnWithContext (or) CombineFn.
> Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a
> single accumulator to combine the values.
>
> My understanding is that the parallelism in Combine PTransform will be
> determined by the no. of accumulators. But, the Javadoc describes that
> combining is done in parallel even though the no. of accumulators used to
> combine is one.
>
> How can combine happen parallelly by using only one accumulator?
>
> Regards,
> Rahul
>
> [1]:
> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
> [2]:
> https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
> [3]:
> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443
>