You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Julien Phalip <jp...@gmail.com> on 2020/06/30 17:55:54 UTC

Understanding combiner's distribution logic

Hi,

I had a question about how combiners work, particularly on how the combined
PCollection's subsets are initially formed.

I understand that, according to the documentation
<https://beam.apache.org/documentation/programming-guide/#combine>, a
combiner allows parallelizing the computation to multiple workers by
breaking up the PCollection into subsets. I like the database analogy given
in this post
<https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind>,
which says that it is similar to pushing down a predicate.

I also understand that it is possible to use withFanout or withHotkeyFanout
to provide some explicit logic as a hint on how to manage the distribution.

What is unclear to me, however, is whether by default the runner already
plans the distribution of the computation, even when no explicit hints are
provided. I'm guessing perhaps it always breaks up the PCollection into
bundles
<https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence>
(similar to DoFns), then the combiner runs the combination on each bundle,
saves the result into intermediary accumulators, and those results then
bubble up recursively to the top? If that's the case, then I assume that
the purpose of withFanout and withHotKeyFanout is to further break up those
initially pre-created bundles into even smaller subsets? Or am I guessing
this wrong? :)

I couldn't find a clear description in the documentation on how the
PCollection subsets are initially formed. Please let me know if you have
some details on that, or if it is already documented somewhere.

Thank you!

Julien

Re: Understanding combiner's distribution logic

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Jun 30, 2020 at 3:32 PM Julien Phalip <jp...@gmail.com> wrote:

> Thanks Luke!
>
> One part I'm still a bit unclear about is how exactly the PreCombine stage
> works. In particular, I'm wondering how it can perform the combination
> before the GBK. Is it because it can already compute the combination on
> adjacent elements that happen to share the same key?
>

Yes, exactly. Even non-adjacent elements within the same bundle (e.g. a
runner might instruct one worker to process a the first half of a file, and
another worker the second half; these two halves would each be a "bundle").

Basically what happens is a "DoFn" gets inserted right before the GBK that,
upon receipt of a KV, buffers elements by key in a hashtable in case it
sees the key again and can do some pre-emptive combining, and then emits
them all in finishing. Essentially.

    public void processElement(KV<K, V> element) {
      if (this.buffer.contains(element.getKey())) {
        // merge element.value with this.buffer.get(element.getValue())
      } else {
        this.buffer.put(element.getKey(), element.getValue()); //// well,
really creates a new acumulator here
      }
    }

    public void finishBundle(Context context) {
      for (kv : this.buffer.entrySet()) {
        context.output(kv);
      }
    }

(The actual logic is a bit more complicated to avoid unbounded growth in
the table, etc.)

Could you also clarify the term "lifting" in this context? Does that refer
> to the act of pushing a partial combination before the GBK?
>

Lifting is just the terminology used to designate that when the graph looks
like

    ... -> (GroupByKey -> Combine) -> ...

the runner may turn it into

    ... -> PartialCombine -> GroupByKey -> FinishCombine -> ...


> On Tue, Jun 30, 2020 at 12:34 PM Luke Cwik <lc...@google.com> wrote:
>
>> Your reasoning is correct around the withHotkeyFanout hint and it is to
>> help runners know that there is likely one or more keys that will have
>> significantly more data then the others but the logic around how it is
>> broken up is runner dependent and whether they rely on the hint or not is
>> also runner dependent. If a runner was smart enough, it wouldn't need the
>> hint and could automatically detect hotkeys and do the right thing. I would
>> take a look at this doc[1] to learn about how the optimization can work
>> from a runners perspective. Some runners never perform the PreCombine,
>> while others may have multiple rounds of it but the most common case is
>> that there is only a single PreCombine (assuming it is allowed).
>>
>> 1: https://s.apache.org/beam-runner-api-combine-model
>>
>> On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip <jp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I had a question about how combiners work, particularly on how the
>>> combined PCollection's subsets are initially formed.
>>>
>>> I understand that, according to the documentation
>>> <https://beam.apache.org/documentation/programming-guide/#combine>, a
>>> combiner allows parallelizing the computation to multiple workers by
>>> breaking up the PCollection into subsets. I like the database analogy given
>>> in this post
>>> <https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind>,
>>> which says that it is similar to pushing down a predicate.
>>>
>>> I also understand that it is possible to use withFanout or
>>> withHotkeyFanout to provide some explicit logic as a hint on how to
>>> manage the distribution.
>>>
>>> What is unclear to me, however, is whether by default the runner already
>>> plans the distribution of the computation, even when no explicit hints are
>>> provided. I'm guessing perhaps it always breaks up the PCollection into
>>> bundles
>>> <https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence>
>>> (similar to DoFns), then the combiner runs the combination on each bundle,
>>> saves the result into intermediary accumulators, and those results then
>>> bubble up recursively to the top? If that's the case, then I assume that
>>> the purpose of withFanout and withHotKeyFanout is to further break up
>>> those initially pre-created bundles into even smaller subsets? Or am I
>>> guessing this wrong? :)
>>>
>>> I couldn't find a clear description in the documentation on how the
>>> PCollection subsets are initially formed. Please let me know if you have
>>> some details on that, or if it is already documented somewhere.
>>>
>>> Thank you!
>>>
>>> Julien
>>>
>>

Re: Understanding combiner's distribution logic

Posted by Julien Phalip <jp...@gmail.com>.
Thanks Luke!

One part I'm still a bit unclear about is how exactly the PreCombine stage
works. In particular, I'm wondering how it can perform the combination
before the GBK. Is it because it can already compute the combination on
adjacent elements that happen to share the same key?

Could you also clarify the term "lifting" in this context? Does that refer
to the act of pushing a partial combination before the GBK?

On Tue, Jun 30, 2020 at 12:34 PM Luke Cwik <lc...@google.com> wrote:

> Your reasoning is correct around the withHotkeyFanout hint and it is to
> help runners know that there is likely one or more keys that will have
> significantly more data then the others but the logic around how it is
> broken up is runner dependent and whether they rely on the hint or not is
> also runner dependent. If a runner was smart enough, it wouldn't need the
> hint and could automatically detect hotkeys and do the right thing. I would
> take a look at this doc[1] to learn about how the optimization can work
> from a runners perspective. Some runners never perform the PreCombine,
> while others may have multiple rounds of it but the most common case is
> that there is only a single PreCombine (assuming it is allowed).
>
> 1: https://s.apache.org/beam-runner-api-combine-model
>
> On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip <jp...@gmail.com> wrote:
>
>> Hi,
>>
>> I had a question about how combiners work, particularly on how the
>> combined PCollection's subsets are initially formed.
>>
>> I understand that, according to the documentation
>> <https://beam.apache.org/documentation/programming-guide/#combine>, a
>> combiner allows parallelizing the computation to multiple workers by
>> breaking up the PCollection into subsets. I like the database analogy given
>> in this post
>> <https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind>,
>> which says that it is similar to pushing down a predicate.
>>
>> I also understand that it is possible to use withFanout or
>> withHotkeyFanout to provide some explicit logic as a hint on how to
>> manage the distribution.
>>
>> What is unclear to me, however, is whether by default the runner already
>> plans the distribution of the computation, even when no explicit hints are
>> provided. I'm guessing perhaps it always breaks up the PCollection into
>> bundles
>> <https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence>
>> (similar to DoFns), then the combiner runs the combination on each bundle,
>> saves the result into intermediary accumulators, and those results then
>> bubble up recursively to the top? If that's the case, then I assume that
>> the purpose of withFanout and withHotKeyFanout is to further break up
>> those initially pre-created bundles into even smaller subsets? Or am I
>> guessing this wrong? :)
>>
>> I couldn't find a clear description in the documentation on how the
>> PCollection subsets are initially formed. Please let me know if you have
>> some details on that, or if it is already documented somewhere.
>>
>> Thank you!
>>
>> Julien
>>
>

Re: Understanding combiner's distribution logic

Posted by Luke Cwik <lc...@google.com>.
Your reasoning is correct around the withHotkeyFanout hint and it is to
help runners know that there is likely one or more keys that will have
significantly more data then the others but the logic around how it is
broken up is runner dependent and whether they rely on the hint or not is
also runner dependent. If a runner was smart enough, it wouldn't need the
hint and could automatically detect hotkeys and do the right thing. I would
take a look at this doc[1] to learn about how the optimization can work
from a runners perspective. Some runners never perform the PreCombine,
while others may have multiple rounds of it but the most common case is
that there is only a single PreCombine (assuming it is allowed).

1: https://s.apache.org/beam-runner-api-combine-model

On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip <jp...@gmail.com> wrote:

> Hi,
>
> I had a question about how combiners work, particularly on how the
> combined PCollection's subsets are initially formed.
>
> I understand that, according to the documentation
> <https://beam.apache.org/documentation/programming-guide/#combine>, a
> combiner allows parallelizing the computation to multiple workers by
> breaking up the PCollection into subsets. I like the database analogy given
> in this post
> <https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind>,
> which says that it is similar to pushing down a predicate.
>
> I also understand that it is possible to use withFanout or
> withHotkeyFanout to provide some explicit logic as a hint on how to
> manage the distribution.
>
> What is unclear to me, however, is whether by default the runner already
> plans the distribution of the computation, even when no explicit hints are
> provided. I'm guessing perhaps it always breaks up the PCollection into
> bundles
> <https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence>
> (similar to DoFns), then the combiner runs the combination on each bundle,
> saves the result into intermediary accumulators, and those results then
> bubble up recursively to the top? If that's the case, then I assume that
> the purpose of withFanout and withHotKeyFanout is to further break up
> those initially pre-created bundles into even smaller subsets? Or am I
> guessing this wrong? :)
>
> I couldn't find a clear description in the documentation on how the
> PCollection subsets are initially formed. Please let me know if you have
> some details on that, or if it is already documented somewhere.
>
> Thank you!
>
> Julien
>