You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 18:22:25 UTC

[2/3] git commit: Added parameter checks and tests for first-n operator. Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.

Added parameter checks and tests for first-n operator.
Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/141946a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/141946a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/141946a7

Branch: refs/heads/master
Commit: 141946a762efd7e98b66e455e28ebad0e9ea6281
Parents: 6702a2e
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 24 16:02:52 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 18:21:14 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |   4 +
 .../api/java/operators/SortedGrouping.java      |   6 +-
 .../api/java/operators/UnsortedGrouping.java    |   6 +-
 .../api/java/operator/FirstNOperatorTest.java   | 173 +++++++++++++++++++
 .../api/scala/ScalaAPICompletenessTest.scala    |   5 +
 .../test/javaApiOperators/FirstNITCase.java     | 170 ++++++++++++++++++
 6 files changed, 362 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 61cb429..7a13f2f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -404,6 +404,10 @@ public abstract class DataSet<T> {
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 6da5a1a..a0bb920 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -95,10 +95,14 @@ public class SortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
-	 * @param n The desired number of elements.
+	 * @param n The desired number of elements for each group.
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 55dec7e..13720ac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -143,10 +143,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
-	 * @param n The desired number of elements.
+	 * @param n The desired number of elements for each group.
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
+		if(n < 1) {
+			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
+		}
+		
 		return reduceGroup(new FirstReducer<T>(n));
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
new file mode 100644
index 0000000..aaf744c
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FirstNOperatorTest {
+
+	// TUPLE DATA
+	
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testUngroupedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+	}
+	
+	@Test
+	public void testGroupedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy(2).first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.groupBy(1,3).first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.groupBy(0).first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.groupBy(2).first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testGroupedSortedFirstN() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should work
+		try {
+			tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == 0
+		try {
+			tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work n == -1
+		try {
+			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
+			Assert.fail();
+		} catch(InvalidProgramException ipe) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index fb4396b..ba0f6f1 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -67,6 +67,11 @@ class ScalaAPICompletenessTest {
       "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
       "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
       
+      // Exclude first operator for now
+      "org.apache.flink.api.java.DataSet.first",
+      "org.apache.flink.api.java.operators.SortedGrouping.first",
+      "org.apache.flink.api.java.operators.UnsortedGrouping.first",
+      
       // Exclude explicit rebalance and hashPartitionBy for now
       "org.apache.flink.api.java.DataSet.partitionByHash",
       "org.apache.flink.api.java.DataSet.rebalance"

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/141946a7/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
new file mode 100644
index 0000000..358b81b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FirstNITCase extends JavaProgramTestBase {
+	
+	private static int NUM_PROGRAMS = 3;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public FirstNITCase(Configuration config) {
+		super(config);
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	private static class FirstNProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * First-n on ungrouped data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+				
+				seven.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(7)\n";
+			}
+			case 2: {
+				/*
+				 * First-n on grouped data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+															.map(new OneMapper2()).groupBy(0).sum(1);
+				
+				first.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+			}
+			case 3: {
+				/*
+				 * First-n on grouped and sorted data set
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+															.project(1,0).types(Long.class, Integer.class);
+				
+				first.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "(1,1)\n"
+						+ "(2,3)\n(2,2)\n"
+						+ "(3,6)\n(3,5)\n(3,4)\n"
+						+ "(4,10)\n(4,9)\n(4,8)\n"
+						+ "(5,15)\n(5,14)\n(5,13)\n"
+						+ "(6,21)\n(6,20)\n(6,19)\n";
+				
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple1<Integer> one = new Tuple1<Integer>(1);
+		@Override
+		public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
+			return one;
+		}
+	}
+	
+	public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
+		@Override
+		public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
+			one.f0 = value.f1;
+			return one;
+		}
+	}
+	
+}