You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2014/11/22 18:06:33 UTC
[jira] [Created] (FLINK-1272) Add a "reduceWithKey" function
Stephan Ewen created FLINK-1272:
-----------------------------------
Summary: Add a "reduceWithKey" function
Key: FLINK-1272
URL: https://issues.apache.org/jira/browse/FLINK-1272
Project: Flink
Issue Type: New Feature
Components: Java API, Scala API
Reporter: Stephan Ewen
Flink does not assume a key/value model for grouping/aggregating/joining. The keys are specified as positions or paths of the objects to be grouped/joined.
Currently, we do not expose the key in the {{ReduceFunction}} and {{GroupReduceFunction}}, bit give (iterators over) the objects themselves.
Since it is a common case to access the key, I suggest to add a convenience function {{GroupReduceWithKey}} that has the following signature and can be called as follows:
{code}
public interface GroupReduceWithKeyFunction<KEY, IN, OUT> {
void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out);
}
{code}
Scala:
{code}
val data : DataSet[SomePOJO] = ...
data
.groupBy("id")
.reduceGroup( (key, value, out : Collector[(String, Long)]) =>
out.collect( (key, values.minBy(_.timestamp) ) );
{code}
Java:
{code}
DataSet<SomePOJO> data = ...
data
.groupBy("id")
.reduceGroup(
new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>> {
...
}
{code}
The sae
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)