You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philipp Goetze <ph...@tu-ilmenau.de> on 2015/08/21 14:51:11 UTC
Custom Aggregate - Example
Hello community,
how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?
Thank you and best regards,
Philipp
Re: Custom Aggregate - Example
Posted by Gyula Fóra <gy...@apache.org>.
Hi,
Alternatively if you would like to create continuous aggregates per key you
can use ds.groupBy().reduce(..), or use one of the stateful functions in
the scala api such as mapWithState.
For a rolling average per key you can check this exmple:
https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala
Cheers,
Gyula
On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek <al...@apache.org>
wrote:
> Hi,
> with the current API this should do what you are after:
>
> val input = ...
>
> val result = input
> .window(...)
> .groupBy(...)
> .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom
> aggregations. You can also use foldWindow() if you want to do a functional
> fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <ph...@tu-ilmenau.de>
> wrote:
>
>> Hello community,
>>
>> how do I define a custom aggregate function in Flink Streaming (Scala)?
>> Could you please provide an example on how to do that?
>>
>> Thank you and best regards,
>> Philipp
>>
>
Re: Custom Aggregate - Example
Posted by Philipp Goetze <ph...@tu-ilmenau.de>.
Thank you Aljoscha,
I guessed that I should use the reduce method. However, I do not look
for window aggregations. I want to do this on a grouped stream.
The problem is we work with Lists instead of tuples and thus we can not
use the pre-implemented aggregates.
So the idea is to call it like that:
val aggr = source.groupBy(_(0)).reduce(new customReducer(1))
And this is the signature of the class:
class customReducer(field: Int) extends RichReduceFunction[List[Any]]
How do I have to implement this class now, so that it is working
correctly even with parallelism > 1?
I hope you understand what I try to do. =)
Kind Regards,
Philipp
On 21.08.2015 15:28, Aljoscha Krettek wrote:
> Hi,
> with the current API this should do what you are after:
>
> val input = ...
> val result = input
> .window(...)
> .groupBy(...)
> .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom
> aggregations. You can also use foldWindow() if you want to do a
> functional fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze
> <philipp.goetze@tu-ilmenau.de <ma...@tu-ilmenau.de>>
> wrote:
>
> Hello community,
>
> how do I define a custom aggregate function in Flink Streaming
> (Scala)?
> Could you please provide an example on how to do that?
>
> Thank you and best regards,
> Philipp
>
Re: Custom Aggregate - Example
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
with the current API this should do what you are after:
val input = ...
val result = input
.window(...)
.groupBy(...)
.reduceWindow( /* your reduce function */ )
With the reduce function you should be able to implement any custom
aggregations. You can also use foldWindow() if you want to do a functional
fold over the window.
I hope this helps.
Cheers,
Aljoscha
On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <ph...@tu-ilmenau.de>
wrote:
> Hello community,
>
> how do I define a custom aggregate function in Flink Streaming (Scala)?
> Could you please provide an example on how to do that?
>
> Thank you and best regards,
> Philipp
>