You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2017/01/19 16:57:26 UTC

[jira] [Created] (FLINK-5582) Add a general distributive aggregate function

Stephan Ewen created FLINK-5582:
-----------------------------------

             Summary: Add a general distributive aggregate function
                 Key: FLINK-5582
                 URL: https://issues.apache.org/jira/browse/FLINK-5582
             Project: Flink
          Issue Type: New Feature
          Components: Streaming
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.3.0


The {{DataStream}} API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:

  - {{ReduceFunction}} only supports one type as the type that is added and aggregated/returned.
  - {{FoldFunction}} Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.

I suggest to add a generic and powerful aggregation function that supports:
  - Different types to add, accumulate, and return
  - The ability to merge partial aggregated by merging the accumulated type.

The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:

  - The accumulator is the state of the running aggregate
  - Accumulators can be merged
  - Values are added to the accumulator
  - Getting the result from the accumulator perform an optional finalizing operation

{code}
public interface AggregateFunction<IN, ACC, OUT> extends Function {

	ACC createAccumulator();

	void add(IN value, ACC accumulator);

	OUT getResult(ACC accumulator);

	ACC merge(ACC a, ACC b);
}
{code}

Example use:

{code}
public class AverageAccumulator {
    long count;
    long sum;
}

// implementation of a simple average
public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {

    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }

    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    public void add(Integer value, AverageAccumulator acc) {
        acc.sum += value;
        acc.count++;
    }

    public Double getResult(AverageAccumulator acc) {
        return acc.sum / (double) acc.count;
    }
}

// implementation of a weighted average
// this reuses the same accumulator type as the aggregate function for 'average'
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {

    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }

    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    public void add(Datum value, AverageAccumulator acc) {
        acc.count += value.getWeight();
        acc.sum += value.getValue();
    }

    public Double getResult(AverageAccumulator acc) {
        return acc.sum / (double) acc.count;
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)