You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2020/07/01 16:09:15 UTC

Re: Understanding combiner's distribution logic

On Tue, Jun 30, 2020 at 3:32 PM Julien Phalip <jp...@gmail.com> wrote:

> Thanks Luke!
>
> One part I'm still a bit unclear about is how exactly the PreCombine stage
> works. In particular, I'm wondering how it can perform the combination
> before the GBK. Is it because it can already compute the combination on
> adjacent elements that happen to share the same key?
>

Yes, exactly. Even non-adjacent elements within the same bundle (e.g. a
runner might instruct one worker to process a the first half of a file, and
another worker the second half; these two halves would each be a "bundle").

Basically what happens is a "DoFn" gets inserted right before the GBK that,
upon receipt of a KV, buffers elements by key in a hashtable in case it
sees the key again and can do some pre-emptive combining, and then emits
them all in finishing. Essentially.

    public void processElement(KV<K, V> element) {
      if (this.buffer.contains(element.getKey())) {
        // merge element.value with this.buffer.get(element.getValue())
      } else {
        this.buffer.put(element.getKey(), element.getValue()); //// well,
really creates a new acumulator here
      }
    }

    public void finishBundle(Context context) {
      for (kv : this.buffer.entrySet()) {
        context.output(kv);
      }
    }

(The actual logic is a bit more complicated to avoid unbounded growth in
the table, etc.)

Could you also clarify the term "lifting" in this context? Does that refer
> to the act of pushing a partial combination before the GBK?
>

Lifting is just the terminology used to designate that when the graph looks
like

    ... -> (GroupByKey -> Combine) -> ...

the runner may turn it into

    ... -> PartialCombine -> GroupByKey -> FinishCombine -> ...


> On Tue, Jun 30, 2020 at 12:34 PM Luke Cwik <lc...@google.com> wrote:
>
>> Your reasoning is correct around the withHotkeyFanout hint and it is to
>> help runners know that there is likely one or more keys that will have
>> significantly more data then the others but the logic around how it is
>> broken up is runner dependent and whether they rely on the hint or not is
>> also runner dependent. If a runner was smart enough, it wouldn't need the
>> hint and could automatically detect hotkeys and do the right thing. I would
>> take a look at this doc[1] to learn about how the optimization can work
>> from a runners perspective. Some runners never perform the PreCombine,
>> while others may have multiple rounds of it but the most common case is
>> that there is only a single PreCombine (assuming it is allowed).
>>
>> 1: https://s.apache.org/beam-runner-api-combine-model
>>
>> On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip <jp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I had a question about how combiners work, particularly on how the
>>> combined PCollection's subsets are initially formed.
>>>
>>> I understand that, according to the documentation
>>> <https://beam.apache.org/documentation/programming-guide/#combine>, a
>>> combiner allows parallelizing the computation to multiple workers by
>>> breaking up the PCollection into subsets. I like the database analogy given
>>> in this post
>>> <https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind>,
>>> which says that it is similar to pushing down a predicate.
>>>
>>> I also understand that it is possible to use withFanout or
>>> withHotkeyFanout to provide some explicit logic as a hint on how to
>>> manage the distribution.
>>>
>>> What is unclear to me, however, is whether by default the runner already
>>> plans the distribution of the computation, even when no explicit hints are
>>> provided. I'm guessing perhaps it always breaks up the PCollection into
>>> bundles
>>> <https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence>
>>> (similar to DoFns), then the combiner runs the combination on each bundle,
>>> saves the result into intermediary accumulators, and those results then
>>> bubble up recursively to the top? If that's the case, then I assume that
>>> the purpose of withFanout and withHotKeyFanout is to further break up
>>> those initially pre-created bundles into even smaller subsets? Or am I
>>> guessing this wrong? :)
>>>
>>> I couldn't find a clear description in the documentation on how the
>>> PCollection subsets are initially formed. Please let me know if you have
>>> some details on that, or if it is already documented somewhere.
>>>
>>> Thank you!
>>>
>>> Julien
>>>
>>