You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philipp Goetze <ph...@tu-ilmenau.de> on 2015/08/21 14:51:11 UTC

Custom Aggregate - Example

Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp

Re: Custom Aggregate - Example

Posted by Gyula Fóra <gy...@apache.org>.
Hi,

Alternatively if you would like to create continuous aggregates per key you
can use ds.groupBy().reduce(..), or use one of the stateful functions in
the scala api such as mapWithState.

For a rolling average per key you can check this exmple:
https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala

Cheers,
Gyula

On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> with the current API this should do what you are after:
>
> val input = ...
>
> val result = input
>   .window(...)
>   .groupBy(...)
>   .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom
> aggregations. You can also use foldWindow() if you want to do a functional
> fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <ph...@tu-ilmenau.de>
> wrote:
>
>> Hello community,
>>
>> how do I define a custom aggregate function in Flink Streaming (Scala)?
>> Could you please provide an example on how to do that?
>>
>> Thank you and best regards,
>> Philipp
>>
>

Re: Custom Aggregate - Example

Posted by Philipp Goetze <ph...@tu-ilmenau.de>.
Thank you Aljoscha,

I guessed that I should use the reduce method. However, I do not look 
for window aggregations. I want to do this on a grouped stream.

The problem is we work with Lists instead of tuples and thus we can not 
use the pre-implemented aggregates.

So the idea is to call it like that:

    val aggr = source.groupBy(_(0)).reduce(new customReducer(1))

And this is the signature of the class:

    class customReducer(field: Int) extends RichReduceFunction[List[Any]]


How do I have to implement this class now, so that it is working 
correctly even with parallelism > 1?

I hope you understand what I try to do. =)

Kind Regards,
Philipp


On 21.08.2015 15:28, Aljoscha Krettek wrote:
> Hi,
> with the current API this should do what you are after:
>
> val input = ...
> val result = input
>   .window(...)
>   .groupBy(...)
>   .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom 
> aggregations. You can also use foldWindow() if you want to do a 
> functional fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze 
> <philipp.goetze@tu-ilmenau.de <ma...@tu-ilmenau.de>> 
> wrote:
>
>     Hello community,
>
>     how do I define a custom aggregate function in Flink Streaming
>     (Scala)?
>     Could you please provide an example on how to do that?
>
>     Thank you and best regards,
>     Philipp
>


Re: Custom Aggregate - Example

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
with the current API this should do what you are after:

val input = ...

val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom
aggregations. You can also use foldWindow() if you want to do a functional
fold over the window.

I hope this helps.

Cheers,
Aljoscha

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <ph...@tu-ilmenau.de>
wrote:

> Hello community,
>
> how do I define a custom aggregate function in Flink Streaming (Scala)?
> Could you please provide an example on how to do that?
>
> Thank you and best regards,
> Philipp
>