You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2016/01/14 21:22:39 UTC
[jira] [Updated] (FLINK-1272) Add a "reduceWithKey" function
[ https://issues.apache.org/jira/browse/FLINK-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske updated FLINK-1272:
---------------------------------
Component/s: (was: Java API)
(was: Scala API)
DataSet API
> Add a "reduceWithKey" function
> ------------------------------
>
> Key: FLINK-1272
> URL: https://issues.apache.org/jira/browse/FLINK-1272
> Project: Flink
> Issue Type: New Feature
> Components: DataSet API
> Reporter: Stephan Ewen
> Assignee: Fabian Hueske
>
> Flink does not assume a key/value model for grouping/aggregating/joining. The keys are specified as positions or paths of the objects to be grouped/joined.
> Currently, we do not expose the key in the {{ReduceFunction}} and {{GroupReduceFunction}}, bit give (iterators over) the objects themselves.
> Since it is a common case to access the key, I suggest to add a convenience function {{GroupReduceWithKey}} that has the following signature and can be called as follows:
> {code}
> public interface GroupReduceWithKeyFunction<KEY, IN, OUT> {
> void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out);
> }
> {code}
> Scala:
> {code}
> val data : DataSet[SomePOJO] = ...
> data
> .groupBy("id")
> .reduceGroup( (key, value, out : Collector[(String, Long)]) =>
> out.collect( (key, values.minBy(_.timestamp) ) );
> {code}
> Java:
> {code}
> DataSet<SomePOJO> data = ...
> data
> .groupBy("id")
> .reduceGroup(
> new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>> {
> ...
> }
> {code}
> The sae
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)