You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Frank Wilson <fa...@gmail.com> on 2019/10/08 10:09:21 UTC

Computing two aggregate functions on the same window

Hi,

In the datastream api is there a way to take two aggregate functions and
apply them to the same window? The output would be a stream of 2-tuples
containing the result of each aggregate function.

I feel it should be possible to combine previously written functions rather
than writing a bespoke ‘god’ aggregate function for each pipeline.

Thanks,

Frank

Re: Computing two aggregate functions on the same window

Posted by Chesnay Schepler <ch...@apache.org>.
There doesn't seem to be a built-in way to apply multiple aggregations 
to a window.

You could use an aggregate function that combines other aggregate 
functions, but admittedly this will get unwieldy as the number of 
functions increase:

public static class MultiAggregateFunction<IN, ACC1, OUT1, F1 extends AggregateFunction<IN, ACC1, OUT1>, ACC2, OUT2, F2 extends AggregateFunction<IN, ACC2, OUT2>>
    implements AggregateFunction<IN, Tuple2<ACC1, ACC2>, Tuple2<OUT1, OUT2>> {

    private final F1 f1; private final F2 f2; public MultiAggregateFunction(F1 f1, F2 f2) {
       this.f1 = f1; this.f2 = f2; }

    @Override public Tuple2<ACC1, ACC2> createAccumulator() {
       return Tuple2.of(f1.createAccumulator(), f2.createAccumulator()); }

    @Override public Tuple2<ACC1, ACC2> add(IN value, Tuple2<ACC1, ACC2> accumulator) {
       f1.add(value, accumulator.f0); f2.add(value, accumulator.f1); return accumulator; }

    @Override public Tuple2<OUT1, OUT2> getResult(Tuple2<ACC1, ACC2> accumulator) {
       return Tuple2.of(f1.getResult(accumulator.f0), f2.getResult(accumulator.f1)); }

    @Override public Tuple2<ACC1, ACC2> merge(Tuple2<ACC1, ACC2> a, Tuple2<ACC1, ACC2> b) {
       return Tuple2.of(f1.merge(a.f0, b.f0), f2.merge(a.f1, b.f1)); }
}


On 08/10/2019 12:09, Frank Wilson wrote:
> Hi,
>
> In the datastream api is there a way to take two aggregate functions 
> and apply them to the same window? The output would be a stream of 
> 2-tuples containing the result of each aggregate function.
>
> I feel it should be possible to combine previously written functions 
> rather than writing a bespoke ‘god’ aggregate function for each pipeline.
>
> Thanks,
>
> Frank