You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 18:22:25 UTC
[2/3] git commit: Added parameter checks and tests for first-n
operator. Excluded DataSet.first(), UnsortedGrouping.first(),
and SortedGrouping.first() methods from ScalaAPICompletenessTest.
Added parameter checks and tests for first-n operator.
Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/141946a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/141946a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/141946a7
Branch: refs/heads/master
Commit: 141946a762efd7e98b66e455e28ebad0e9ea6281
Parents: 6702a2e
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:02:52 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:14 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 4 +
.../api/java/operators/SortedGrouping.java | 6 +-
.../api/java/operators/UnsortedGrouping.java | 6 +-
.../api/java/operator/FirstNOperatorTest.java | 173 +++++++++++++++++++
.../api/scala/ScalaAPICompletenessTest.scala | 5 +
.../test/javaApiOperators/FirstNITCase.java | 170 ++++++++++++++++++
6 files changed, 362 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 61cb429..7a13f2f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -404,6 +404,10 @@ public abstract class DataSet<T> {
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 6da5a1a..a0bb920 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -95,10 +95,14 @@ public class SortedGrouping<T> extends Grouping<T> {
/**
* Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
- * @param n The desired number of elements.
+ * @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 55dec7e..13720ac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -143,10 +143,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
/**
* Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
- * @param n The desired number of elements.
+ * @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
+ if(n < 1) {
+ throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+ }
+
return reduceGroup(new FirstReducer<T>(n));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
new file mode 100644
index 0000000..aaf744c
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FirstNOperatorTest {
+
+ // TUPLE DATA
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testUngroupedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ }
+
+ @Test
+ public void testGroupedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.groupBy(2).first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.groupBy(1,3).first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.groupBy(0).first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.groupBy(2).first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testGroupedSortedFirstN() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should work
+ try {
+ tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == 0
+ try {
+ tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // should not work n == -1
+ try {
+ tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
+ Assert.fail();
+ } catch(InvalidProgramException ipe) {
+ // we're good here
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index fb4396b..ba0f6f1 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -67,6 +67,11 @@ class ScalaAPICompletenessTest {
"org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
"org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
+ // Exclude first operator for now
+ "org.apache.flink.api.java.DataSet.first",
+ "org.apache.flink.api.java.operators.SortedGrouping.first",
+ "org.apache.flink.api.java.operators.UnsortedGrouping.first",
+
// Exclude explicit rebalance and hashPartitionBy for now
"org.apache.flink.api.java.DataSet.partitionByHash",
"org.apache.flink.api.java.DataSet.rebalance"
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
new file mode 100644
index 0000000..358b81b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FirstNITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 3;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public FirstNITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class FirstNProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /*
+ * First-n on ungrouped data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+
+ seven.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(7)\n";
+ }
+ case 2: {
+ /*
+ * First-n on grouped data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+ .map(new OneMapper2()).groupBy(0).sum(1);
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+ }
+ case 3: {
+ /*
+ * First-n on grouped and sorted data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+ .project(1,0).types(Long.class, Integer.class);
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "(1,1)\n"
+ + "(2,3)\n(2,2)\n"
+ + "(3,6)\n(3,5)\n(3,4)\n"
+ + "(4,10)\n(4,9)\n(4,8)\n"
+ + "(5,15)\n(5,14)\n(5,13)\n"
+ + "(6,21)\n(6,20)\n(6,19)\n";
+
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+
+ }
+
+ }
+
+ public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple1<Integer> one = new Tuple1<Integer>(1);
+ @Override
+ public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
+ return one;
+ }
+ }
+
+ public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
+ @Override
+ public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
+ one.f0 = value.f1;
+ return one;
+ }
+ }
+
+}