You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luis Mariano Guerra <ma...@event-fabric.com> on 2016/09/02 15:37:00 UTC

emit a single Map per window

hi!

I'm trying to collect some metrics by key per window and emiting the full
result at the end of the window to kafka, I started with a simple count by
key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the
end of the window merge them together and emit a singe result, I don't want
to accumulate all the events and calculate at the end of the window, from
my understanding of fold in other languages/libraries, this would be what I
need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction,
winFunction);

where foldFunction accumulates by key and winFunction iterate over the
hasmaps and merges them into a single result hashmap and emits that one at
the end.

this emits many one-key hash maps instead of only one with all the keys, I
tried setting setParallelism(1) in multiple places but still doesn't work.
More confusingly, in one run it emited a single map but after I ran it
again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if
required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function
is called for each evaluation of the window for each key individually. The
output of the window function is interpreted as a regular non-windowed
stream.

emphasis on " for each key individually", the return type of apply is
SingleOutputStreamOperator which doesn't provide many operations to group
the emited values.

thanks in advance.

Re: emit a single Map per window

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
what's the number of unique keys and the parallelism of your job? If the
former is larger than the latter you should indeed have one
"timeWindowFold" be responsible for several keys. How are you determining
whether one of these is only accumulating for a single key?

Cheers,
Aljoscha

On Mon, 5 Sep 2016 at 17:35 Luis Mariano Guerra <ma...@event-fabric.com>
wrote:

> On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
> for this you would have to use a non-parallel window, i.e. something like
>> stream.windowAll(<my window>).apply(...). This does not compute per key but
>> has the drawback that computation does not happen in parallel. If you only
>> use it to combine the pre-aggregated maps it could be OK, though.
>>
>> Cheers,
>> Aljoscha
>>
>
> hi,
>
> thanks for the tip, it works, I was aware of the non parallel nature of
> what I want to do, after seeing it work I tried this:
>
> input.flatMap(new LineSplitter()).keyBy(0)
>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>                 .apply(new HashMap<String, Integer>(), timeWindowFold,
> timeWindowMerge)
>                 .windowAll(TumblingEventTimeWindows.of(Time.of(5,
> TimeUnit.SECONDS)))
>                 .apply(new HashMap<String, Integer>(), windowAllFold,
> windowAllMerge);
>
> and it seems to work, but it seems each timeWindowFold accumulates a
> single key, I was expecting to have one or more keys per fold depending on
> in which processing node the computation was being handled, I don't care if
> I emit one event per key, but I want to know if it's ok and if I'm missing
> something (maybe I have to add a line to specify partition?)
>
>
>> On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <ma...@event-fabric.com>
>> wrote:
>>
>>> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> from this I would expect to get as many HashMaps as you have keys. The
>>>> winFunction is also executed per-key so it cannot combine the HashMaps of
>>>> all keys.
>>>>
>>>> Does this describe the behavior that you're seeing?
>>>>
>>>
>>> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
>>> HashMaps from the same window into a single one, I can't find how.
>>>
>>>
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <
>>>> mariano@event-fabric.com> wrote:
>>>>
>>>>> hi!
>>>>>
>>>>> I'm trying to collect some metrics by key per window and emiting the
>>>>> full result at the end of the window to kafka, I started with a simple
>>>>> count by key to test it but my requirements are a little more complex than
>>>>> that.
>>>>>
>>>>> what I want to do is to fold the stream events as they come and then
>>>>> at the end of the window merge them together and emit a singe result, I
>>>>> don't want to accumulate all the events and calculate at the end of the
>>>>> window, from my understanding of fold in other languages/libraries, this
>>>>> would be what I need, using it via apply(stateIn, foldFun, windowFun) but
>>>>> it's not working:
>>>>>
>>>>> the basic is:
>>>>>
>>>>>     input
>>>>>                 .flatMap(new LineSplitter())
>>>>>                 .keyBy(0)
>>>>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>>>                 .apply(new HashMap<String, Integer>(), foldFunction,
>>>>> winFunction);
>>>>>
>>>>> where foldFunction accumulates by key and winFunction iterate over the
>>>>> hasmaps and merges them into a single result hashmap and emits that one at
>>>>> the end.
>>>>>
>>>>> this emits many one-key hash maps instead of only one with all the
>>>>> keys, I tried setting setParallelism(1) in multiple places but still
>>>>> doesn't work. More confusingly, in one run it emited a single map but after
>>>>> I ran it again it went back to the previous behavior.
>>>>>
>>>>> what I'm doing wrong? is there any other approach?
>>>>>
>>>>> I can provide the implementation of foldFunction and winFunction if
>>>>> required but I think it doesn't change much.
>>>>>
>>>>> Reading the source code I see:
>>>>>
>>>>>     Applies the given window function to each window. The window
>>>>> function is called for each evaluation of the window for each key
>>>>> individually. The output of the window function is interpreted as a regular
>>>>> non-windowed stream.
>>>>>
>>>>> emphasis on " for each key individually", the return type of apply is
>>>>> SingleOutputStreamOperator which doesn't provide many operations to group
>>>>> the emited values.
>>>>>
>>>>> thanks in advance.
>>>>>
>>>>

Re: emit a single Map per window

Posted by Luis Mariano Guerra <ma...@event-fabric.com>.
On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> for this you would have to use a non-parallel window, i.e. something like
> stream.windowAll(<my window>).apply(...). This does not compute per key but
> has the drawback that computation does not happen in parallel. If you only
> use it to combine the pre-aggregated maps it could be OK, though.
>
> Cheers,
> Aljoscha
>

hi,

thanks for the tip, it works, I was aware of the non parallel nature of
what I want to do, after seeing it work I tried this:

input.flatMap(new LineSplitter()).keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), timeWindowFold,
timeWindowMerge)
                .windowAll(TumblingEventTimeWindows.of(Time.of(5,
TimeUnit.SECONDS)))
                .apply(new HashMap<String, Integer>(), windowAllFold,
windowAllMerge);

and it seems to work, but it seems each timeWindowFold accumulates a single
key, I was expecting to have one or more keys per fold depending on in
which processing node the computation was being handled, I don't care if I
emit one event per key, but I want to know if it's ok and if I'm missing
something (maybe I have to add a line to specify partition?)


> On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <ma...@event-fabric.com>
> wrote:
>
>> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> from this I would expect to get as many HashMaps as you have keys. The
>>> winFunction is also executed per-key so it cannot combine the HashMaps of
>>> all keys.
>>>
>>> Does this describe the behavior that you're seeing?
>>>
>>
>> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
>> HashMaps from the same window into a single one, I can't find how.
>>
>>
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <
>>> mariano@event-fabric.com> wrote:
>>>
>>>> hi!
>>>>
>>>> I'm trying to collect some metrics by key per window and emiting the
>>>> full result at the end of the window to kafka, I started with a simple
>>>> count by key to test it but my requirements are a little more complex than
>>>> that.
>>>>
>>>> what I want to do is to fold the stream events as they come and then at
>>>> the end of the window merge them together and emit a singe result, I don't
>>>> want to accumulate all the events and calculate at the end of the window,
>>>> from my understanding of fold in other languages/libraries, this would be
>>>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
>>>> working:
>>>>
>>>> the basic is:
>>>>
>>>>     input
>>>>                 .flatMap(new LineSplitter())
>>>>                 .keyBy(0)
>>>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>>                 .apply(new HashMap<String, Integer>(), foldFunction,
>>>> winFunction);
>>>>
>>>> where foldFunction accumulates by key and winFunction iterate over the
>>>> hasmaps and merges them into a single result hashmap and emits that one at
>>>> the end.
>>>>
>>>> this emits many one-key hash maps instead of only one with all the
>>>> keys, I tried setting setParallelism(1) in multiple places but still
>>>> doesn't work. More confusingly, in one run it emited a single map but after
>>>> I ran it again it went back to the previous behavior.
>>>>
>>>> what I'm doing wrong? is there any other approach?
>>>>
>>>> I can provide the implementation of foldFunction and winFunction if
>>>> required but I think it doesn't change much.
>>>>
>>>> Reading the source code I see:
>>>>
>>>>     Applies the given window function to each window. The window
>>>> function is called for each evaluation of the window for each key
>>>> individually. The output of the window function is interpreted as a regular
>>>> non-windowed stream.
>>>>
>>>> emphasis on " for each key individually", the return type of apply is
>>>> SingleOutputStreamOperator which doesn't provide many operations to group
>>>> the emited values.
>>>>
>>>> thanks in advance.
>>>>
>>>

Re: emit a single Map per window

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
for this you would have to use a non-parallel window, i.e. something like
stream.windowAll(<my window>).apply(...). This does not compute per key but
has the drawback that computation does not happen in parallel. If you only
use it to combine the pre-aggregated maps it could be OK, though.

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <ma...@event-fabric.com>
wrote:

> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> from this I would expect to get as many HashMaps as you have keys. The
>> winFunction is also executed per-key so it cannot combine the HashMaps of
>> all keys.
>>
>> Does this describe the behavior that you're seeing?
>>
>
> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
> HashMaps from the same window into a single one, I can't find how.
>
>
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <ma...@event-fabric.com>
>> wrote:
>>
>>> hi!
>>>
>>> I'm trying to collect some metrics by key per window and emiting the
>>> full result at the end of the window to kafka, I started with a simple
>>> count by key to test it but my requirements are a little more complex than
>>> that.
>>>
>>> what I want to do is to fold the stream events as they come and then at
>>> the end of the window merge them together and emit a singe result, I don't
>>> want to accumulate all the events and calculate at the end of the window,
>>> from my understanding of fold in other languages/libraries, this would be
>>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
>>> working:
>>>
>>> the basic is:
>>>
>>>     input
>>>                 .flatMap(new LineSplitter())
>>>                 .keyBy(0)
>>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>                 .apply(new HashMap<String, Integer>(), foldFunction,
>>> winFunction);
>>>
>>> where foldFunction accumulates by key and winFunction iterate over the
>>> hasmaps and merges them into a single result hashmap and emits that one at
>>> the end.
>>>
>>> this emits many one-key hash maps instead of only one with all the keys,
>>> I tried setting setParallelism(1) in multiple places but still doesn't
>>> work. More confusingly, in one run it emited a single map but after I ran
>>> it again it went back to the previous behavior.
>>>
>>> what I'm doing wrong? is there any other approach?
>>>
>>> I can provide the implementation of foldFunction and winFunction if
>>> required but I think it doesn't change much.
>>>
>>> Reading the source code I see:
>>>
>>>     Applies the given window function to each window. The window
>>> function is called for each evaluation of the window for each key
>>> individually. The output of the window function is interpreted as a regular
>>> non-windowed stream.
>>>
>>> emphasis on " for each key individually", the return type of apply is
>>> SingleOutputStreamOperator which doesn't provide many operations to group
>>> the emited values.
>>>
>>> thanks in advance.
>>>
>>

Re: emit a single Map per window

Posted by Luis Mariano Guerra <ma...@event-fabric.com>.
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> from this I would expect to get as many HashMaps as you have keys. The
> winFunction is also executed per-key so it cannot combine the HashMaps of
> all keys.
>
> Does this describe the behavior that you're seeing?
>

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
HashMaps from the same window into a single one, I can't find how.


>
> Cheers,
> Aljoscha
>
> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <ma...@event-fabric.com>
> wrote:
>
>> hi!
>>
>> I'm trying to collect some metrics by key per window and emiting the full
>> result at the end of the window to kafka, I started with a simple count by
>> key to test it but my requirements are a little more complex than that.
>>
>> what I want to do is to fold the stream events as they come and then at
>> the end of the window merge them together and emit a singe result, I don't
>> want to accumulate all the events and calculate at the end of the window,
>> from my understanding of fold in other languages/libraries, this would be
>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
>> working:
>>
>> the basic is:
>>
>>     input
>>                 .flatMap(new LineSplitter())
>>                 .keyBy(0)
>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>                 .apply(new HashMap<String, Integer>(), foldFunction,
>> winFunction);
>>
>> where foldFunction accumulates by key and winFunction iterate over the
>> hasmaps and merges them into a single result hashmap and emits that one at
>> the end.
>>
>> this emits many one-key hash maps instead of only one with all the keys,
>> I tried setting setParallelism(1) in multiple places but still doesn't
>> work. More confusingly, in one run it emited a single map but after I ran
>> it again it went back to the previous behavior.
>>
>> what I'm doing wrong? is there any other approach?
>>
>> I can provide the implementation of foldFunction and winFunction if
>> required but I think it doesn't change much.
>>
>> Reading the source code I see:
>>
>>     Applies the given window function to each window. The window function
>> is called for each evaluation of the window for each key individually. The
>> output of the window function is interpreted as a regular non-windowed
>> stream.
>>
>> emphasis on " for each key individually", the return type of apply is
>> SingleOutputStreamOperator which doesn't provide many operations to group
>> the emited values.
>>
>> thanks in advance.
>>
>

Re: emit a single Map per window

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
from this I would expect to get as many HashMaps as you have keys. The
winFunction is also executed per-key so it cannot combine the HashMaps of
all keys.

Does this describe the behavior that you're seeing?

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <ma...@event-fabric.com>
wrote:

> hi!
>
> I'm trying to collect some metrics by key per window and emiting the full
> result at the end of the window to kafka, I started with a simple count by
> key to test it but my requirements are a little more complex than that.
>
> what I want to do is to fold the stream events as they come and then at
> the end of the window merge them together and emit a singe result, I don't
> want to accumulate all the events and calculate at the end of the window,
> from my understanding of fold in other languages/libraries, this would be
> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
> working:
>
> the basic is:
>
>     input
>                 .flatMap(new LineSplitter())
>                 .keyBy(0)
>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>                 .apply(new HashMap<String, Integer>(), foldFunction,
> winFunction);
>
> where foldFunction accumulates by key and winFunction iterate over the
> hasmaps and merges them into a single result hashmap and emits that one at
> the end.
>
> this emits many one-key hash maps instead of only one with all the keys, I
> tried setting setParallelism(1) in multiple places but still doesn't work.
> More confusingly, in one run it emited a single map but after I ran it
> again it went back to the previous behavior.
>
> what I'm doing wrong? is there any other approach?
>
> I can provide the implementation of foldFunction and winFunction if
> required but I think it doesn't change much.
>
> Reading the source code I see:
>
>     Applies the given window function to each window. The window function
> is called for each evaluation of the window for each key individually. The
> output of the window function is interpreted as a regular non-windowed
> stream.
>
> emphasis on " for each key individually", the return type of apply is
> SingleOutputStreamOperator which doesn't provide many operations to group
> the emited values.
>
> thanks in advance.
>