You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 18:22:24 UTC
[1/3] git commit: [FLINK-970] Adds first() operation on DataSet,
UnsortedGrouping, and SortedGrouping
Repository: incubator-flink
Updated Branches:
refs/heads/master e5731e0ed -> a3b02840d
[FLINK-970] Adds first() operation on DataSet, UnsortedGrouping, and SortedGrouping
This closes #88
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6702a2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6702a2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6702a2e3
Branch: refs/heads/master
Commit: 6702a2e317bee74930999ba26d50e75f555d75c5
Parents: e5731e0
Author: zentol <s....@web.de>
Authored: Mon Jul 28 14:13:19 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 12:44:13 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 10 ++++
.../flink/api/java/functions/FirstReducer.java | 54 ++++++++++++++++++++
.../api/java/operators/SortedGrouping.java | 10 ++++
.../api/java/operators/UnsortedGrouping.java | 10 ++++
4 files changed, 84 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ff487a2..61cb429 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,6 +50,7 @@ import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FilterOperator;
+import org.apache.flink.api.java.functions.FirstReducer;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
@@ -397,6 +398,15 @@ public abstract class DataSet<T> {
(TupleTypeInfo) this.type, fields));
}
+ /**
+ * Returns a new set containing the first n elements in this {@link DataSet}.<br/>
+ * @param n The desired number of elements.
+ * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+ */
+ public GroupReduceOperator<T, T> first(int n) {
+ return reduceGroup(new FirstReducer<T>(n));
+ }
+
// --------------------------------------------------------------------------------------------
// distinct
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
new file mode 100644
index 0000000..890a0ca
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final int count;
+
+ public FirstReducer(int n) {
+ this.count = n;
+ }
+
+ @Override
+ public void reduce(Iterable<T> values, Collector<T> out) throws Exception {
+
+ int emitCnt = 0;
+ for(T val : values) {
+ out.collect(val);
+
+ emitCnt++;
+ if(emitCnt == count) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void combine(Iterable<T> values, Collector<T> out) throws Exception {
+ reduce(values, out);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 24744e3..6da5a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.operators;
+import org.apache.flink.api.java.functions.FirstReducer;
import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
@@ -92,6 +93,15 @@ public class SortedGrouping<T> extends Grouping<T> {
}
+ /**
+ * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
+ * @param n The desired number of elements.
+ * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+ */
+ public GroupReduceOperator<T, T> first(int n) {
+ return reduceGroup(new FirstReducer<T>(n));
+ }
+
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index e0b9bf3..55dec7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.functions.FirstReducer;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.aggregation.Aggregations;
@@ -139,6 +140,15 @@ public class UnsortedGrouping<T> extends Grouping<T> {
return new GroupReduceOperator<T, R>(this, resultType, reducer);
}
+
+ /**
+ * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
+ * @param n The desired number of elements.
+ * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+ */
+ public GroupReduceOperator<T, T> first(int n) {
+ return reduceGroup(new FirstReducer<T>(n));
+ }
/**
* Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br/>
[3/3] git commit: Added documentation for first-n operator.
Posted by fh...@apache.org.
Added documentation for first-n operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a3b02840
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a3b02840
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a3b02840
Branch: refs/heads/master
Commit: a3b02840dcbfb8ea2f1c448c06b8a9fbb1e3f65d
Parents: 141946a
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:34:36 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:22 2014 +0200
----------------------------------------------------------------------
docs/dataset_transformations.md | 23 +++++++++++++++++++++--
docs/programming_guide.md | 22 ++++++++++++++++++++--
2 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a3b02840/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index a490a26..ec038a7 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -1134,6 +1134,25 @@ Only Map-like transformations may follow a hash-partition transformation, i.e.,
~~~java
DataSet<Tuple2<String, Integer>> in = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
-DataSet<Tuple2<String, String>> links = in.partitionByHash(0)
- .mapPartition(new PartitionMapper());
+DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
+ .mapPartition(new PartitionMapper());
+~~~
+
+### First-n (Java API Only)
+
+Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on a regular DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping keys can be specified as key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Return the first five (arbitrary) elements of the DataSet
+DataSet<Tuple2<String, Integer>> out1 = in.first(5);
+
+// Return the first two (arbitrary) elements of each String group
+DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
+ .first(2);
+
+// Return the first three elements of each String group ordered by the Integer field
+DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
+ .sortGroup(1, Order.ASCENDING)
+ .first(3);
~~~
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a3b02840/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 99fc6d8..6e174ac 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -601,7 +601,7 @@ DataSet<String> result = data1.union(data2);
{% highlight java %}
DataSet<String> in = // [...]
DataSet<String> result = in.rebalance()
- .map(new Mapper())
+ .map(new Mapper());
{% endhighlight %}
</td>
</tr>
@@ -612,7 +612,25 @@ DataSet<String> result = in.rebalance()
{% highlight java %}
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
- .mapPartition(new PartitionMapper())
+ .mapPartition(new PartitionMapper());
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>First-n</strong></td>
+ <td>
+ <p>Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+// regular data set
+DataSet<Tuple2<String,Integer>> result1 = in.first(3);
+// grouped data set
+DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
+ .first(3);
+// grouped-sorted data set
+DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
+ .sortGroup(1, Order.ASCENDING)
+ .first(3);
{% endhighlight %}
</td>
</tr>
[2/3] git commit: Added parameter checks and tests for first-n
operator. Excluded DataSet.first(), UnsortedGrouping.first(),
and SortedGrouping.first() methods from ScalaAPICompletenessTest.
Posted by fh...@apache.org.
Added parameter checks and tests for first-n operator.
Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/141946a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/141946a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/141946a7
Branch: refs/heads/master
Commit: 141946a762efd7e98b66e455e28ebad0e9ea6281
Parents: 6702a2e
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:02:52 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:14 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 4 +
.../api/java/operators/SortedGrouping.java | 6 +-
.../api/java/operators/UnsortedGrouping.java | 6 +-
.../api/java/operator/FirstNOperatorTest.java | 173 +++++++++++++++++++
.../api/scala/ScalaAPICompletenessTest.scala | 5 +
.../test/javaApiOperators/FirstNITCase.java | 170 ++++++++++++++++++
6 files changed, 362 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 61cb429..7a13f2f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -404,6 +404,10 @@ public abstract class DataSet<T> {
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 6da5a1a..a0bb920 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -95,10 +95,14 @@ public class SortedGrouping<T> extends Grouping<T> {
/**
* Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
- * @param n The desired number of elements.
+ * @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 55dec7e..13720ac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -143,10 +143,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
/**
* Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
- * @param n The desired number of elements.
+ * @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
new file mode 100644
index 0000000..aaf744c
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FirstNOperatorTest {
+
+ // TUPLE DATA
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testUngroupedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ }
+
+ @Test
+ public void testGroupedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.groupBy(2).first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.groupBy(1,3).first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.groupBy(0).first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.groupBy(2).first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testGroupedSortedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index fb4396b..ba0f6f1 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -67,6 +67,11 @@ class ScalaAPICompletenessTest {
"org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
"org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
+ // Exclude first operator for now
+ "org.apache.flink.api.java.DataSet.first",
+ "org.apache.flink.api.java.operators.SortedGrouping.first",
+ "org.apache.flink.api.java.operators.UnsortedGrouping.first",
+
// Exclude explicit rebalance and hashPartitionBy for now
"org.apache.flink.api.java.DataSet.partitionByHash",
"org.apache.flink.api.java.DataSet.rebalance"
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
new file mode 100644
index 0000000..358b81b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FirstNITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 3;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public FirstNITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class FirstNProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /*
+ * First-n on ungrouped data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+
+ seven.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(7)\n";
+ }
+ case 2: {
+ /*
+ * First-n on grouped data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+ .map(new OneMapper2()).groupBy(0).sum(1);
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+ }
+ case 3: {
+ /*
+ * First-n on grouped and sorted data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+ .project(1,0).types(Long.class, Integer.class);
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(1,1)\n"
+ + "(2,3)\n(2,2)\n"
+ + "(3,6)\n(3,5)\n(3,4)\n"
+ + "(4,10)\n(4,9)\n(4,8)\n"
+ + "(5,15)\n(5,14)\n(5,13)\n"
+ + "(6,21)\n(6,20)\n(6,19)\n";
+
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+
+ }
+
+ }
+
+ public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple1<Integer> one = new Tuple1<Integer>(1);
+ @Override
+ public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
+ return one;
+ }
+ }
+
+ public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
+ @Override
+ public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
+ one.f0 = value.f1;
+ return one;
+ }
+ }
+
+}