You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2017/01/22 22:14:26 UTC

[jira] [Closed] (FLINK-5582) Add a general distributive aggregate function

     [ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stephan Ewen closed FLINK-5582.
-------------------------------

> Add a general distributive aggregate function
> ---------------------------------------------
>
>                 Key: FLINK-5582
>                 URL: https://issues.apache.org/jira/browse/FLINK-5582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
> 	ACC createAccumulator();
> 	void add(IN value, ACC accumulator);
> 	OUT getResult(ACC accumulator);
> 	ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
>     long count;
>     long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Integer value, AverageAccumulator acc) {
>         acc.sum += value;
>         acc.count++;
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 'average'
> public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Datum value, AverageAccumulator acc) {
>         acc.count += value.getWeight();
>         acc.sum += value.getValue();
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)