You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2014/07/11 13:47:05 UTC

[jira] [Commented] (FLINK-970) Implement a first(n) operator

    [ https://issues.apache.org/jira/browse/FLINK-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14058678#comment-14058678 ] 

Chesnay Schepler commented on FLINK-970:
----------------------------------------

so something like this then:
{code:java}
@Override
public void reduce(Iterator<T> values, Collector<T> out) throws Exception {
	int indexThis = getRuntimeContext().getIndexOfThisSubtask();
	int indexTotal = getRuntimeContext().getNumberOfParallelSubtasks();
	if (indexThis <= n) {
		int partition = n / indexTotal;
		int remaining = n % (partition * indexTotal);
		if (indexThis >= remaining){
			partition++;
		}
		for (int x = 0; x < partition & values.hasNext(); x++) {
			out.collect(values.next());
		}
	}
}
{code}

Why the combiner? If the user asks for n elements, shouldn't we return exactly n elements? (if possible)

> Implement a first(n) operator
> -----------------------------
>
>                 Key: FLINK-970
>                 URL: https://issues.apache.org/jira/browse/FLINK-970
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Timo Walther
>            Assignee: Chesnay Schepler
>            Priority: Minor
>
> It is only syntactic sugar, but I had many cases where I just needed the first element  or the first 2 elements in a GroupReduce.
> E.g. Instead of
> {code:java}
> .reduceGroup(new GroupReduceFunction<String, String>() {
> 					@Override
> 					public void reduce(Iterator<String> values, Collector<String> out) throws Exception {
> 						out.collect(values.next());
> 					}
> 				})
> {code}
> {code:java}
> .first()
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)