You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Josh <jo...@gmail.com> on 2017/06/20 13:32:10 UTC

What state is buffered when using Combine.perKey with an accumulator?

Hi all,

I have a question about how much state is buffered when using
Combine.perKey with a custom accumulator. For example, I have:

PCollection<KV<String, String>> elements = ...;

PCollection<KV<String, List<String>> topValuesPerKey = elements

.apply(Window.into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(10))))

.accumulatingFiredPanes())

.apply(Combine.perKey(new MyCombineFunction()));


Here MyCombineFunction is for each key, counting the occurrences of each
value. It's output for each key is a List<String> of the values that occur
most frequently. In this case the accumulator for each key just stores a
Map<String, Long> of values and their associated counts.


My question is - since I am accumulatingFiredPanes forever on the global
window - is every element going to be buffered forever (i.e. amount of
space needed will constantly increase)? Or, is the amount of state buffered
determined by my accumulator (i.e. determined by the number of unique
values across all keys)? If the former is the case, how can I optimise my
job so that the accumulator is the only state stored across panes?


Thanks for any advice,

Josh

Re: What state is buffered when using Combine.perKey with an accumulator?

Posted by Josh <jo...@gmail.com>.
Hi Kenn,

Thanks for the reply, that makes sense.
As far as I can tell, the DirectPipelineRunner doesn't do this optimisation
(when I test the pipeline locally) but I guess the DataflowRunner will.

Josh

On Tue, Jun 20, 2017 at 4:26 PM, Kenneth Knowles <kl...@google.com> wrote:

> Hi Josh,
>
> Exactly what is stored technically depends on optimization decisions by
> the runner. But you can generally expect that only the accumulator is
> stored across trigger firings, not the input elements.
>
> Kenn
>
> On Tue, Jun 20, 2017 at 6:32 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a question about how much state is buffered when using
>> Combine.perKey with a custom accumulator. For example, I have:
>>
>> PCollection<KV<String, String>> elements = ...;
>>
>> PCollection<KV<String, List<String>> topValuesPerKey = elements
>>
>> .apply(Window.into(new GlobalWindows())
>>
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane()
>>
>> .plusDelayOf(Duration.standardSeconds(10))))
>>
>> .accumulatingFiredPanes())
>>
>> .apply(Combine.perKey(new MyCombineFunction()));
>>
>>
>> Here MyCombineFunction is for each key, counting the occurrences of each
>> value. It's output for each key is a List<String> of the values that occur
>> most frequently. In this case the accumulator for each key just stores a
>> Map<String, Long> of values and their associated counts.
>>
>>
>> My question is - since I am accumulatingFiredPanes forever on the global
>> window - is every element going to be buffered forever (i.e. amount of
>> space needed will constantly increase)? Or, is the amount of state buffered
>> determined by my accumulator (i.e. determined by the number of unique
>> values across all keys)? If the former is the case, how can I optimise my
>> job so that the accumulator is the only state stored across panes?
>>
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>

Re: What state is buffered when using Combine.perKey with an accumulator?

Posted by Kenneth Knowles <kl...@google.com>.
Hi Josh,

Exactly what is stored technically depends on optimization decisions by the
runner. But you can generally expect that only the accumulator is stored
across trigger firings, not the input elements.

Kenn

On Tue, Jun 20, 2017 at 6:32 AM, Josh <jo...@gmail.com> wrote:

> Hi all,
>
> I have a question about how much state is buffered when using
> Combine.perKey with a custom accumulator. For example, I have:
>
> PCollection<KV<String, String>> elements = ...;
>
> PCollection<KV<String, List<String>> topValuesPerKey = elements
>
> .apply(Window.into(new GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane()
>
> .plusDelayOf(Duration.standardSeconds(10))))
>
> .accumulatingFiredPanes())
>
> .apply(Combine.perKey(new MyCombineFunction()));
>
>
> Here MyCombineFunction is for each key, counting the occurrences of each
> value. It's output for each key is a List<String> of the values that occur
> most frequently. In this case the accumulator for each key just stores a
> Map<String, Long> of values and their associated counts.
>
>
> My question is - since I am accumulatingFiredPanes forever on the global
> window - is every element going to be buffered forever (i.e. amount of
> space needed will constantly increase)? Or, is the amount of state buffered
> determined by my accumulator (i.e. determined by the number of unique
> values across all keys)? If the former is the case, how can I optimise my
> job so that the accumulator is the only state stored across panes?
>
>
> Thanks for any advice,
>
> Josh
>