You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2017/01/22 22:14:26 UTC
[jira] [Resolved] (FLINK-5582) Add a general distributive aggregate
function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephan Ewen resolved FLINK-5582.
---------------------------------
Resolution: Implemented
Implemented in 09380e49256bff924734b9a932808e0f4daa7e5c
> Add a general distributive aggregate function
> ---------------------------------------------
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:
> - {{ReduceFunction}} only supports one type as the type that is added and aggregated/returned.
> - {{FoldFunction}} Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
> - Different types to add, accumulate, and return
> - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:
> - The accumulator is the state of the running aggregate
> - Accumulators can be merged
> - Values are added to the accumulator
> - Getting the result from the accumulator perform an optional finalizing operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
> ACC createAccumulator();
> void add(IN value, ACC accumulator);
> OUT getResult(ACC accumulator);
> ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 'average'
> public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)