You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/11/28 15:47:12 UTC

[jira] [Commented] (FLINK-1293) Add support for out-of-place aggregations

    [ https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228313#comment-14228313 ] 

ASF GitHub Bot commented on FLINK-1293:
---------------------------------------

GitHub user he-sk opened a pull request:

    https://github.com/apache/incubator-flink/pull/243

    [FLINK-1293] Add support for out-of-place aggregations

    This patch adds support for multiple aggregations on the same field and aggregations which change the output type, e.g., count and average.
    
    It adds the following aggregation syntax to the Java API:
    
        // A string key and a long value
        DataSet<Tuple2<String, Long>> ds = ...
    
        // count all the elements
        DataSet<Tuple1<Long>> result = ds.aggregate(count()); 
    
        //  count elements in each group
        DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(allKeys(), count()); 
    
        // same as above but explicitly state the returned key(s)
        DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(key(0), count());
    
        // only return counts, drop keys
        DataSet<Tuple1<Long>> result = ds.groupBy(0).aggregate(count());
    
        // average reuses count and sum
        DataSet<Tuple4<String, Long, Long, Double>> result = 
            ds.groupBy(0).aggregate(allKeys(), count(), sum(1), average(1));
    
    Five aggregation functions are supported: min, max, sum, count and average. Notice that count does not take a field reference in the example above.
    
    Internally, the aggregation is implemented by the following operator chain:
    
        Input -> Map1 -> Reduce -> Map2 -> Output
    
    - Map1 constructs an intermediate tuple on which the aggregation is performed. It's main task is to copy tuple fields that are used in multiple aggregation functions and initialize fields required for the internal implementation of aggregation functions. For example, average requires a field to count the tuples.
    
    - Reduce performs the actual aggregation
    
    - Map2 computes the final result of the aggregation function, e.g., computing the average from the sum and the count, and drops the fields used in groupings that are not requested in the output.
    
    Average is implemented to reuse an existing sum and/or count function. In the last example above, the intermediate tuple contains only 3 fields: the key, a field to compute the sum, and a field to compute the count. The field holding the average is added after the Reduce operator in the Map2 operator.
    
    Currently, only the Java API is implemented. My Scala knowledge is fairly limited, so it would be great if somebody else could pick that up.
    
    Also, the result type of aggregate is simply &lt;T extends Tuple>. To support type inference at compile time, it is necessary to integrate the work by Chen (https://github.com/apache/incubator-flink/pull/194).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/he-sk/incubator-flink aggregation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/243.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #243
    
----
commit f571e6a54b6d3ea6858bc583cf8885c5b5f8f426
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-13T16:20:33Z

    deprecated current aggregation code

commit da928501ad84d7e1c3a90ce7645d7cfccf4ae920
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-14T10:32:22Z

    Test cases and API for new aggregations

commit d70407ce976206082af580c35a420d094a23852f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-14T15:21:29Z

    ungrouped aggregations

commit 525b54bdf6163fbcf5f0ba6d9c223e7d3f82fdfc
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-17T12:17:45Z

    grouped aggregations

commit a3290a6da175b3bc1f6a55d9316396252d9d4787
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T12:59:29Z

    missing license headers

commit 4110ac74fb137384989ba0ac09bb7310228398d1
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T14:28:28Z

    moved AggregationUdf into correct package

commit 9ad97ced38be651d98af29c6791fd40aebcb4354
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T14:30:18Z

    name the aggregation operator in the DAG

commit 79eb9ebb627823e6c4d1d599c1c73871eb30a068
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T14:31:11Z

    additional tests for aggregation

commit 56b961fa83334c4dff4dd677300b3587f7502c71
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T16:38:50Z

    CountAggregationFunction does not need an input type

commit 8abbb258fbf261dc54f6ce3370dce4613804766f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T16:48:31Z

    documentation updates

commit 5627c92b7524c54218afe5018fe2dcc93f32ec87
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-18T17:13:27Z

    fixed test case

commit 156abcd00d9f54998fb0510d1a9782761aadbc6b
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-19T02:09:32Z

    fixed CI build

commit 3c148a47da55f6160433ea10d08f04f909a1c244
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-19T14:21:55Z

    renamed old aggregation convenience functions

commit 6df19c99356a6fdb7904fa2d73f23abe6e407d18
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-19T14:30:23Z

    alternative aggregation API

commit 41136d683560d391df9aee88583ff9e984319d8c
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-24T09:17:06Z

    removed author comments

commit da6b25a9e25658ebb395c2f49fbbf1aa81941b3c
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-24T13:10:29Z

    implemented aggregation as Map -> Reduce -> Map

commit a206282a39d757fb1a0c79636498fd3225ef5051
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T13:24:42Z

    removed aggregation API based on GroupReduce

commit 8cf6c064f9b8ece039e6130d91323fa1d96d58e6
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T13:34:27Z

    documentation and test updates

commit b205df3b1ab640583c82cf98c05357e7cbe68719
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T16:11:27Z

    fix Java 8 examples that use old API

commit 577980ab54301d4520411116ddc55cc55a3b59fc
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T16:43:11Z

    move 2nd approach to different branch

commit 15c87bafac62b64f83be596e9eab22dd68cb23d1
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T16:50:39Z

    make getters/setters package private

commit 11849c0bdcbc5c17a913f1c4df62205f79e024c3
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T17:11:58Z

    check that tuple field is not out of bounds

commit 48ee3fcf8f99b0f2f9ace696d3409b343e324f4d
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T18:52:31Z

    allKeys() marker

commit 1ec4c4c1837c3c25a64ac128fdd67a37b35a424f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T20:09:54Z

    reuse sum() and count() in average()

commit 78359c09f06b1f8382f7d3510a7d4c6969a993bd
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date:   2014-11-26T20:53:37Z

    integration tests

----


> Add support for out-of-place aggregations
> -----------------------------------------
>
>                 Key: FLINK-1293
>                 URL: https://issues.apache.org/jira/browse/FLINK-1293
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API, Scala API
>    Affects Versions: 0.7.0-incubating
>            Reporter: Viktor Rosenfeld
>            Assignee: Viktor Rosenfeld
>            Priority: Minor
>
> Currently, the output of an aggregation is of the same type as the input. This restriction has to major drawbacks:
> 1. Every tuple field can only be used in one aggregation because the aggregations result is stored in the field.
> 2. Aggregations having a return type that is different from the input type, e.g., count or average, cannot be implemented.
> It would be nice to have the aggregation return any kind of tuple as a result, so the restrictions above no longer apply.
> See also:
> - https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator
> - http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)