You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 18:22:24 UTC

[1/3] git commit: [FLINK-970] Adds first() operation on DataSet, UnsortedGrouping, and SortedGrouping

Repository: incubator-flink
Updated Branches:
  refs/heads/master e5731e0ed -> a3b02840d


[FLINK-970] Adds first() operation on DataSet, UnsortedGrouping, and SortedGrouping

This closes #88


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6702a2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6702a2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6702a2e3

Branch: refs/heads/master
Commit: 6702a2e317bee74930999ba26d50e75f555d75c5
Parents: e5731e0
Author: zentol <s....@web.de>
Authored: Mon Jul 28 14:13:19 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 12:44:13 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 10 ++++
 .../flink/api/java/functions/FirstReducer.java  | 54 ++++++++++++++++++++
 .../api/java/operators/SortedGrouping.java      | 10 ++++
 .../api/java/operators/UnsortedGrouping.java    | 10 ++++
 4 files changed, 84 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ff487a2..61cb429 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,6 +50,7 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
+import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
@@ -397,6 +398,15 @@ public abstract class DataSet<T> {
 				(TupleTypeInfo) this.type, fields));
 	}
 
+	/**
+	 * Returns a new set containing the first n elements in this {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  distinct
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
new file mode 100644
index 0000000..890a0ca
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T> {
+	private static final long serialVersionUID = 1L;
+
+	private final int count;
+
+	public FirstReducer(int n) {
+		this.count = n;
+	}
+
+	@Override
+	public void reduce(Iterable<T> values, Collector<T> out) throws Exception {
+
+		int emitCnt = 0;
+		for(T val : values) {
+			out.collect(val);
+			
+			emitCnt++;
+			if(emitCnt == count) {
+				break;
+			}
+		}
+	}
+	
+	@Override
+	public void combine(Iterable<T> values, Collector<T> out) throws Exception {
+		reduce(values, out);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 24744e3..6da5a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.java.functions.FirstReducer;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -92,6 +93,15 @@ public class SortedGrouping<T> extends Grouping<T> {
 	}
 
 	
+	/**
+	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index e0b9bf3..55dec7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -139,6 +140,15 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 
 		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
+	
+	/**
+	 * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
 
 	/**
 	 * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br/>


[3/3] git commit: Added documentation for first-n operator.

Posted by fh...@apache.org.
Added documentation for first-n operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a3b02840
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a3b02840
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a3b02840

Branch: refs/heads/master
Commit: a3b02840dcbfb8ea2f1c448c06b8a9fbb1e3f65d
Parents: 141946a
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:34:36 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:22 2014 +0200

----------------------------------------------------------------------
 docs/dataset_transformations.md | 23 +++++++++++++++++++++--
 docs/programming_guide.md       | 22 ++++++++++++++++++++--
 2 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a3b02840/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index a490a26..ec038a7 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -1134,6 +1134,25 @@ Only Map-like transformations may follow a hash-partition transformation, i.e.,
 ~~~java
 DataSet<Tuple2<String, Integer>> in = // [...]
 // hash-partition DataSet by String value and apply a MapPartition transformation.
-DataSet<Tuple2<String, String>> links = in.partitionByHash(0)
-                                          .mapPartition(new PartitionMapper());
+DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
+                                        .mapPartition(new PartitionMapper());
+~~~
+
+### First-n (Java API Only)
+
+Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on a regular DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping keys can be specified as key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Return the first five (arbitrary) elements of the DataSet
+DataSet<Tuple2<String, Integer>> out1 = in.first(5);
+
+// Return the first two (arbitrary) elements of each String group
+DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
+                                          .first(2);
+
+// Return the first three elements of each String group ordered by the Integer field
+DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
+                                          .sortGroup(1, Order.ASCENDING)
+                                          .first(3);
 ~~~
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a3b02840/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 99fc6d8..6e174ac 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -601,7 +601,7 @@ DataSet<String> result = data1.union(data2);
 {% highlight java %}
 DataSet<String> in = // [...]
 DataSet<String> result = in.rebalance()
-                           .map(new Mapper())
+                           .map(new Mapper());
 {% endhighlight %}
       </td>
     </tr>
@@ -612,7 +612,25 @@ DataSet<String> result = in.rebalance()
 {% highlight java %}
 DataSet<Tuple2<String,Integer>> in = // [...]
 DataSet<Integer> result = in.partitionByHash(0)
-                            .mapPartition(new PartitionMapper())
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>First-n</strong></td>
+      <td>
+        <p>Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+// regular data set
+DataSet<Tuple2<String,Integer>> result1 = in.first(3);
+// grouped data set
+DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
+                                            .first(3);
+// grouped-sorted data set
+DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
+                                            .sortGroup(1, Order.ASCENDING)
+                                            .first(3);
 {% endhighlight %}
       </td>
     </tr>


[2/3] git commit: Added parameter checks and tests for first-n operator. Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.

Posted by fh...@apache.org.
Added parameter checks and tests for first-n operator.
Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/141946a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/141946a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/141946a7

Branch: refs/heads/master
Commit: 141946a762efd7e98b66e455e28ebad0e9ea6281
Parents: 6702a2e
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:02:52 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:14 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |   4 +
 .../api/java/operators/SortedGrouping.java      |   6 +-
 .../api/java/operators/UnsortedGrouping.java    |   6 +-
 .../api/java/operator/FirstNOperatorTest.java   | 173 +++++++++++++++++++
 .../api/scala/ScalaAPICompletenessTest.scala    |   5 +
 .../test/javaApiOperators/FirstNITCase.java     | 170 ++++++++++++++++++
 6 files changed, 362 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 61cb429..7a13f2f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -404,6 +404,10 @@ public abstract class DataSet<T> {
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 6da5a1a..a0bb920 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -95,10 +95,14 @@ public class SortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
-	 * @param n The desired number of elements.
+	 * @param n The desired number of elements for each group.
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 55dec7e..13720ac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -143,10 +143,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
-	 * @param n The desired number of elements.
+	 * @param n The desired number of elements for each group.
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
new file mode 100644
index 0000000..aaf744c
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FirstNOperatorTest {
+
+	// TUPLE DATA
+	
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testUngroupedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+	}
+	
+	@Test
+	public void testGroupedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy(2).first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.groupBy(1,3).first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.groupBy(0).first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.groupBy(2).first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testGroupedSortedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index fb4396b..ba0f6f1 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -67,6 +67,11 @@ class ScalaAPICompletenessTest {
       "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
       "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
       
+      // Exclude first operator for now
+      "org.apache.flink.api.java.DataSet.first",
+      "org.apache.flink.api.java.operators.SortedGrouping.first",
+      "org.apache.flink.api.java.operators.UnsortedGrouping.first",
+      
       // Exclude explicit rebalance and hashPartitionBy for now
       "org.apache.flink.api.java.DataSet.partitionByHash",
       "org.apache.flink.api.java.DataSet.rebalance"

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
new file mode 100644
index 0000000..358b81b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FirstNITCase extends JavaProgramTestBase {
+	
+	private static int NUM_PROGRAMS = 3;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public FirstNITCase(Configuration config) {
+		super(config);
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	private static class FirstNProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * First-n on ungrouped data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+				
+				seven.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(7)\n";
+			}
+			case 2: {
+				/*
+				 * First-n on grouped data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+															.map(new OneMapper2()).groupBy(0).sum(1);
+				
+				first.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+			}
+			case 3: {
+				/*
+				 * First-n on grouped and sorted data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+															.project(1,0).types(Long.class, Integer.class);
+				
+				first.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(1,1)\n"
+						+ "(2,3)\n(2,2)\n"
+						+ "(3,6)\n(3,5)\n(3,4)\n"
+						+ "(4,10)\n(4,9)\n(4,8)\n"
+						+ "(5,15)\n(5,14)\n(5,13)\n"
+						+ "(6,21)\n(6,20)\n(6,19)\n";
+				
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple1<Integer> one = new Tuple1<Integer>(1);
+		@Override
+		public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
+			return one;
+		}
+	}
+	
+	public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
+		@Override
+		public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
+			one.f0 = value.f1;
+			return one;
+		}
+	}
+	
+}