You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/11/28 15:47:12 UTC
[jira] [Commented] (FLINK-1293) Add support for out-of-place
aggregations
[ https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228313#comment-14228313 ]
ASF GitHub Bot commented on FLINK-1293:
---------------------------------------
GitHub user he-sk opened a pull request:
https://github.com/apache/incubator-flink/pull/243
[FLINK-1293] Add support for out-of-place aggregations
This patch adds support for multiple aggregations on the same field and aggregations which change the output type, e.g., count and average.
It adds the following aggregation syntax to the Java API:
// A string key and a long value
DataSet<Tuple2<String, Long>> ds = ...
// count all the elements
DataSet<Tuple1<Long>> result = ds.aggregate(count());
// count elements in each group
DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(allKeys(), count());
// same as above but explicitly state the returned key(s)
DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(key(0), count());
// only return counts, drop keys
DataSet<Tuple1<Long>> result = ds.groupBy(0).aggregate(count());
// average reuses count and sum
DataSet<Tuple4<String, Long, Long, Double>> result =
ds.groupBy(0).aggregate(allKeys(), count(), sum(1), average(1));
Five aggregation functions are supported: min, max, sum, count and average. Notice that count does not take a field reference in the example above.
Internally, the aggregation is implemented by the following operator chain:
Input -> Map1 -> Reduce -> Map2 -> Output
- Map1 constructs an intermediate tuple on which the aggregation is performed. It's main task is to copy tuple fields that are used in multiple aggregation functions and initialize fields required for the internal implementation of aggregation functions. For example, average requires a field to count the tuples.
- Reduce performs the actual aggregation
- Map2 computes the final result of the aggregation function, e.g., computing the average from the sum and the count, and drops the fields used in groupings that are not requested in the output.
Average is implemented to reuse an existing sum and/or count function. In the last example above, the intermediate tuple contains only 3 fields: the key, a field to compute the sum, and a field to compute the count. The field holding the average is added after the Reduce operator in the Map2 operator.
Currently, only the Java API is implemented. My Scala knowledge is fairly limited, so it would be great if somebody else could pick that up.
Also, the result type of aggregate is simply <T extends Tuple>. To support type inference at compile time, it is necessary to integrate the work by Chen (https://github.com/apache/incubator-flink/pull/194).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/he-sk/incubator-flink aggregation
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-flink/pull/243.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #243
----
commit f571e6a54b6d3ea6858bc583cf8885c5b5f8f426
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-13T16:20:33Z
deprecated current aggregation code
commit da928501ad84d7e1c3a90ce7645d7cfccf4ae920
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-14T10:32:22Z
Test cases and API for new aggregations
commit d70407ce976206082af580c35a420d094a23852f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-14T15:21:29Z
ungrouped aggregations
commit 525b54bdf6163fbcf5f0ba6d9c223e7d3f82fdfc
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-17T12:17:45Z
grouped aggregations
commit a3290a6da175b3bc1f6a55d9316396252d9d4787
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T12:59:29Z
missing license headers
commit 4110ac74fb137384989ba0ac09bb7310228398d1
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T14:28:28Z
moved AggregationUdf into correct package
commit 9ad97ced38be651d98af29c6791fd40aebcb4354
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T14:30:18Z
name the aggregation operator in the DAG
commit 79eb9ebb627823e6c4d1d599c1c73871eb30a068
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T14:31:11Z
additional tests for aggregation
commit 56b961fa83334c4dff4dd677300b3587f7502c71
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T16:38:50Z
CountAggregationFunction does not need an input type
commit 8abbb258fbf261dc54f6ce3370dce4613804766f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T16:48:31Z
documentation updates
commit 5627c92b7524c54218afe5018fe2dcc93f32ec87
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-18T17:13:27Z
fixed test case
commit 156abcd00d9f54998fb0510d1a9782761aadbc6b
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-19T02:09:32Z
fixed CI build
commit 3c148a47da55f6160433ea10d08f04f909a1c244
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-19T14:21:55Z
renamed old aggregation convenience functions
commit 6df19c99356a6fdb7904fa2d73f23abe6e407d18
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-19T14:30:23Z
alternative aggregation API
commit 41136d683560d391df9aee88583ff9e984319d8c
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-24T09:17:06Z
removed author comments
commit da6b25a9e25658ebb395c2f49fbbf1aa81941b3c
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-24T13:10:29Z
implemented aggregation as Map -> Reduce -> Map
commit a206282a39d757fb1a0c79636498fd3225ef5051
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T13:24:42Z
removed aggregation API based on GroupReduce
commit 8cf6c064f9b8ece039e6130d91323fa1d96d58e6
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T13:34:27Z
documentation and test updates
commit b205df3b1ab640583c82cf98c05357e7cbe68719
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T16:11:27Z
fix Java 8 examples that use old API
commit 577980ab54301d4520411116ddc55cc55a3b59fc
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T16:43:11Z
move 2nd approach to different branch
commit 15c87bafac62b64f83be596e9eab22dd68cb23d1
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T16:50:39Z
make getters/setters package private
commit 11849c0bdcbc5c17a913f1c4df62205f79e024c3
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T17:11:58Z
check that tuple field is not out of bounds
commit 48ee3fcf8f99b0f2f9ace696d3409b343e324f4d
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T18:52:31Z
allKeys() marker
commit 1ec4c4c1837c3c25a64ac128fdd67a37b35a424f
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T20:09:54Z
reuse sum() and count() in average()
commit 78359c09f06b1f8382f7d3510a7d4c6969a993bd
Author: Viktor Rosenfeld <vi...@tu-berlin.de>
Date: 2014-11-26T20:53:37Z
integration tests
----
> Add support for out-of-place aggregations
> -----------------------------------------
>
> Key: FLINK-1293
> URL: https://issues.apache.org/jira/browse/FLINK-1293
> Project: Flink
> Issue Type: Improvement
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Viktor Rosenfeld
> Assignee: Viktor Rosenfeld
> Priority: Minor
>
> Currently, the output of an aggregation is of the same type as the input. This restriction has to major drawbacks:
> 1. Every tuple field can only be used in one aggregation because the aggregations result is stored in the field.
> 2. Aggregations having a return type that is different from the input type, e.g., count or average, cannot be implemented.
> It would be nice to have the aggregation return any kind of tuple as a result, so the restrictions above no longer apply.
> See also:
> - https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator
> - http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)