You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:10 UTC

[74/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
index d645bc6..9755caa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -30,28 +26,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.Assert;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests for the DataSource
  */
 
-@RunWith(Parameterized.class)
 public class DataSourceITCase extends JavaProgramTestBase {
 
-	private static int NUM_PROGRAMS = 1;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
 	private String inputPath;
 	private String expectedResult;
 
-	public DataSourceITCase(Configuration config) {
-		super(config);	
-	}
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		inputPath = createTempFile("input", "ab\n"
@@ -62,70 +48,42 @@ public class DataSourceITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		expectedResult = DataSourceProgs.runProgram(curProgId, inputPath, resultPath);
+		/*
+				 * Test passing a configuration object to an input format
+				 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Configuration ifConf = new Configuration();
+		ifConf.setString("prepend", "test");
+
+		DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
+		ds.writeAsText(resultPath);
+		env.execute();
+
+		expectedResult= "ab\n"
+				+ "cd\n"
+				+ "ef\n";
 	}
 	
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
-	}
-	
 	private static class TestInputFormat extends TextInputFormat {
 		private static final long serialVersionUID = 1L;
 
 		public TestInputFormat(Path filePath) {
 			super(filePath);
 		}
-		
+
 		@Override
 		public void configure(Configuration parameters) {
 			super.configure(parameters);
-			
+
 			Assert.assertNotNull(parameters.getString("prepend", null));
 			Assert.assertEquals("test", parameters.getString("prepend", null));
 		}
-		
-	}
-	private static class DataSourceProgs {
-		
-		public static String runProgram(int progId, String inputPath, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test passing a configuration object to an input format
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Configuration ifConf = new Configuration();
-				ifConf.setString("prepend", "test");
-				
-				DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
-				ds.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "ab\n"
-					+ "cd\n"
-					+ "ef\n";
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index fb62459..84964a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -31,280 +28,266 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
-public class DistinctITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 8;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+	public DistinctITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public DistinctITCase(Configuration config) {
-		super(config);
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = DistinctProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() throws Exception {
+		/*
+		 * check correctness of distinct on tuples with key field selector
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
+
+		distinctDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected()
+	throws Exception{
+		/*
+		 * check correctness of distinct on tuples with key field selector with not all fields selected
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+				DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
+
+		distinctDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+				"2\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws IOException {
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+
+	@Test
+	public void testCorrectnessOfDistinctOnTuplesWithKeyExtractorFunction() throws Exception {
+		/*
+		 * check correctness of distinct on tuples with key extractor function
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+				.distinct(new KeySelector1()).project(0);
+
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+				"2\n";
+	}
+
+	public static class KeySelector1 implements KeySelector<Tuple5<Integer, Long,  Integer, String, Long>, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(Tuple5<Integer, Long,  Integer, String, Long> in) {
+			return in.f0;
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class DistinctProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				
-				/*
-				 * check correctness of distinct on tuples with key field selector
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
-				
-				distinctDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n";
-			}
-			case 2: {
-				
-				/*
-				 * check correctness of distinct on tuples with key field selector with not all fields selected
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
-				
-				distinctDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1\n" +
-						"2\n";
-			}
-			case 3: {
-				
-				/*
-				 * check correctness of distinct on tuples with key extractor function
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
-						.distinct(new KeySelector<Tuple5<Integer, Long,  Integer, String, Long>, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(Tuple5<Integer, Long,  Integer, String, Long> in) {
-										return in.f0;
-									}
-								}).project(0);
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1\n" +
-						"2\n";
-								
-			}
-			case 4: {
-				
-				/*
-				 * check correctness of distinct on custom type with type extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<Tuple1<Integer>> reduceDs = ds
-						.distinct(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								})
-						.map(new RichMapFunction<CustomType, Tuple1<Integer>>() {
-							@Override
-							public Tuple1<Integer> map(CustomType value) throws Exception {
-								return new Tuple1<Integer>(value.myInt);
-							}
-						});
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1\n" +
-						"2\n" +
-						"3\n" +
-						"4\n" +
-						"5\n" +
-						"6\n";
-				
-			}
-			case 5: {
-				
-				/*
-				 * check correctness of distinct on tuples
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
-				
-				distinctDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n";
-			}
-			case 6: {
-				
-				/*
-				 * check correctness of distinct on custom type with tuple-returning type extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<Integer, Long>> reduceDs = ds
-						.distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
-										return new Tuple2<Integer, Long>(t.f0, t.f4);
-									}
-								})
+
+	@Test
+	public void testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor() throws Exception {
+		/*
+		 * check correctness of distinct on custom type with type extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple1<Integer>> reduceDs = ds
+				.distinct(new KeySelector3())
+				.map(new Mapper3());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+				"2\n" +
+				"3\n" +
+				"4\n" +
+				"5\n" +
+				"6\n";
+	}
+
+	public static class Mapper3 extends RichMapFunction<CustomType, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(CustomType value) throws Exception {
+			return new Tuple1<Integer>(value.myInt);
+		}
+	}
+
+	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnTuples() throws Exception{
+		/*
+		 * check correctness of distinct on tuples
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
+
+		distinctDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n";
+	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws
+			Exception{
+		/*
+		 * check correctness of distinct on custom type with tuple-returning type extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> reduceDs = ds
+				.distinct(new KeySelector2())
 						.project(0,4);
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1\n" +
-						"2,1\n" +
-						"2,2\n" +
-						"3,2\n" +
-						"3,3\n" +
-						"4,1\n" +
-						"4,2\n" +
-						"5,1\n" +
-						"5,2\n" +
-						"5,3\n";
-			}
-			case 7: {
-				
-				/*
-				 * check correctness of distinct on tuples with field expressions
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1\n" +
+				"2,1\n" +
+				"2,2\n" +
+				"3,2\n" +
+				"3,3\n" +
+				"4,1\n" +
+				"4,2\n" +
+				"5,1\n" +
+				"5,2\n" +
+				"5,3\n";
+	}
+
+	public static class KeySelector2 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f4);
+		}
+	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnTuplesWithFieldExpressions() throws Exception {
+		/*
+		 * check correctness of distinct on tuples with field expressions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
 						.distinct("f0").project(0);
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1\n" +
-						"2\n";
-								
-			}
-			case 8: {
-				
-				/*
-				 * check correctness of distinct on Pojos
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-				DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new MapFunction<CollectionDataSets.POJO, Integer>() {
-					@Override
-					public Integer map(POJO value) throws Exception {
-						return (int) value.nestedPojo.longNumber;
-					}
-				});
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "10000\n20000\n30000\n";
-			}
-			case 9: {
-
-					/*
-					 * distinct on full Pojo
-					 */
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-					DataSet<Integer> reduceDs = ds.distinct().map(new MapFunction<CollectionDataSets.POJO, Integer>() {
-						@Override
-						public Integer map(POJO value) throws Exception {
-							return (int) value.nestedPojo.longNumber;
-						}
-					});
-
-					reduceDs.writeAsText(resultPath);
-					env.execute();
-
-					// return expected result
-					return "10000\n20000\n30000\n";
-				}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+				"2\n";
+	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnPojos() throws Exception {
+		/*
+		 * check correctness of distinct on Pojos
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+		DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new Mapper2());
+
+		reduceDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "10000\n20000\n30000\n";
+	}
+
+	public static class Mapper2 implements MapFunction<CollectionDataSets.POJO, Integer> {
+		@Override
+		public Integer map(POJO value) throws Exception {
+			return (int) value.nestedPojo.longNumber;
+		}
+	}
+
+	@Test
+	public void testDistinctOnFullPojo() throws Exception {
+		/*
+		 * distinct on full Pojo
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+		DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1());
+
+		reduceDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "10000\n20000\n30000\n";
+	}
+
+	public static class Mapper1 implements MapFunction<CollectionDataSets.POJO, Integer> {
+		@Override
+		public Integer map(POJO value) throws Exception {
+			return (int) value.nestedPojo.longNumber;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index fbd968d..3bf83b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -29,315 +26,311 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class FilterITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 8;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class FilterITCase extends MultipleProgramsTestBase {
+	public FilterITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public FilterITCase(Configuration config) {
-		super(config);	
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testAllRejectingFilter() throws Exception {
+		/*
+		 * Test all-rejecting filter.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new Filter1());
+
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	public static class Filter1 implements FilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return false;
+		}
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testAllPassingFilter() throws Exception {
+		/*
+		 * Test all-passing filter.
+		 */
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new Filter2());
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n" +
+				"6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" +
+				"8,4,Comment#2\n" +
+				"9,4,Comment#3\n" +
+				"10,4,Comment#4\n" +
+				"11,5,Comment#5\n" +
+				"12,5,Comment#6\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" +
+				"15,5,Comment#9\n" +
+				"16,6,Comment#10\n" +
+				"17,6,Comment#11\n" +
+				"18,6,Comment#12\n" +
+				"19,6,Comment#13\n" +
+				"20,6,Comment#14\n" +
+				"21,6,Comment#15\n";
+	}
+
+	public static class Filter2 implements FilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return true;
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class FilterProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test all-rejecting filter.
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return false;
-							}
-						});
-				
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "\n";
-			}
-			case 2: {
-				/*
-				 * Test all-passing filter.
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return true;
-							}
-						});
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n" +
-						"4,3,Hello world, how are you?\n" +
-						"5,3,I am fine.\n" +
-						"6,3,Luke Skywalker\n" +
-						"7,4,Comment#1\n" +
-						"8,4,Comment#2\n" +
-						"9,4,Comment#3\n" +
-						"10,4,Comment#4\n" +
-						"11,5,Comment#5\n" +
-						"12,5,Comment#6\n" +
-						"13,5,Comment#7\n" +
-						"14,5,Comment#8\n" +
-						"15,5,Comment#9\n" +
-						"16,6,Comment#10\n" +
-						"17,6,Comment#11\n" +
-						"18,6,Comment#12\n" +
-						"19,6,Comment#13\n" +
-						"20,6,Comment#14\n" +
-						"21,6,Comment#15\n";
-			}
-			case 3: {
-				/*
-				 * Test filter on String tuple field.
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return value.f2.contains("world");
-							}
-						});
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "3,2,Hello world\n" +
-						"4,3,Hello world, how are you?\n";
-				
-			}
-			case 4: {
-				/*
+
+	@Test
+	public void testFilterOnStringTupleField() throws Exception {
+		/*
+		 * Test filter on String tuple field.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new Filter3());
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n";
+
+	}
+
+	public static class Filter3 implements FilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return value.f2.contains("world");
+		}
+	}
+
+	@Test
+	public void testFilterOnIntegerTupleField() throws Exception {
+		/*
 				 * Test filter on Integer tuple field.
 				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return (value.f0 % 2) == 0;
-							}
-						});
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "2,2,Hello\n" +
-						"4,3,Hello world, how are you?\n" +
-						"6,3,Luke Skywalker\n" +
-						"8,4,Comment#2\n" +
-						"10,4,Comment#4\n" +
-						"12,5,Comment#6\n" +
-						"14,5,Comment#8\n" +
-						"16,6,Comment#10\n" +
-						"18,6,Comment#12\n" +
-						"20,6,Comment#14\n";
-			}
-			case 5: {
-				/*
-				 * Test filter on basic type
-				 */
-						
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-				DataSet<String> filterDs = ds.
-						filter(new FilterFunction<String>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(String value) throws Exception {
-								return value.startsWith("H");
-							}
-						});
-				filterDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi\n" +
-					   "Hello\n" + 
-					   "Hello world\n" +
-					   "Hello world, how are you?\n";
-			}
-			case 6: {
-				/*
-				 * Test filter on custom type
-				 */
-						
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> filterDs = ds.
-						filter(new FilterFunction<CustomType>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(CustomType value) throws Exception {
-								return value.myString.contains("a");
-							}
-						});
-				filterDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "3,3,Hello world, how are you?\n" +
-						"3,4,I am fine.\n" +
-						"3,5,Luke Skywalker\n";
-			}
-			case 7: {
-				/*
-				 * Test filter on String tuple field.
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							int literal = -1;
-							
-							@Override
-							public void open(Configuration config) {
-								Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-								for(int i: ints) {
-									literal = literal < i ? i : literal;
-								}
-							}
-							
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return value.f0 < literal;
-							}
-						}).withBroadcastSet(ints, "ints");
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n" +
-						"4,3,Hello world, how are you?\n";
-			}
-			case 8: {
-				/*
-				 * Test filter with broadcast variables
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-							private  int broadcastSum = 0;
-							
-							@Override
-							public void open(Configuration config) {
-								Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-								for(Integer i : ints) {
-									broadcastSum += i;
-								}
-							}
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return (value.f1 == (broadcastSum / 11));
-							}
-						}).withBroadcastSet(intDs, "ints");
-				filterDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "11,5,Comment#5\n" +
-						"12,5,Comment#6\n" +
-						"13,5,Comment#7\n" +
-						"14,5,Comment#8\n" +
-						"15,5,Comment#9\n";
-				
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new Filter4());
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "2,2,Hello\n" +
+				"4,3,Hello world, how are you?\n" +
+				"6,3,Luke Skywalker\n" +
+				"8,4,Comment#2\n" +
+				"10,4,Comment#4\n" +
+				"12,5,Comment#6\n" +
+				"14,5,Comment#8\n" +
+				"16,6,Comment#10\n" +
+				"18,6,Comment#12\n" +
+				"20,6,Comment#14\n";
+	}
+
+	public static class Filter4 implements FilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return (value.f0 % 2) == 0;
+		}
+	}
+
+	@Test
+	public void testFilterBasicType() throws Exception {
+		/*
+		 * Test filter on basic type
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> filterDs = ds.
+				filter(new Filter5());
+		filterDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "Hi\n" +
+				"Hello\n" +
+				"Hello world\n" +
+				"Hello world, how are you?\n";
+	}
+
+	public static class Filter5 implements FilterFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(String value) throws Exception {
+			return value.startsWith("H");
+		}
+	}
+
+	@Test
+	public void testFilterOnCustomType() throws Exception {
+		/*
+		 * Test filter on custom type
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> filterDs = ds.
+				filter(new Filter6());
+		filterDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "3,3,Hello world, how are you?\n" +
+				"3,4,I am fine.\n" +
+				"3,5,Luke Skywalker\n";
+	}
+
+	public static class Filter6 implements FilterFunction<CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(CustomType value) throws Exception {
+			return value.myString.contains("a");
+		}
+	}
+
+	@Test
+	public void testRichFilterOnStringTupleField() throws Exception {
+		/*
+		 * Test filter on String tuple field.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new RichFilter1()).withBroadcastSet(ints, "ints");
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n";
+	}
+
+	public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		int literal = -1;
+
+		@Override
+		public void open(Configuration config) {
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			for(int i: ints) {
+				literal = literal < i ? i : literal;
 			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+		}
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return value.f0 < literal;
+		}
+	}
+
+	@Test
+	public void testFilterWithBroadcastVariables() throws Exception {
+		/*
+		 * Test filter with broadcast variables
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(new RichFilter2()).withBroadcastSet(intDs, "ints");
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "11,5,Comment#5\n" +
+				"12,5,Comment#6\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" +
+				"15,5,Comment#9\n";
+	}
+
+	public static class RichFilter2 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+		private  int broadcastSum = 0;
+
+		@Override
+		public void open(Configuration config) {
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			for(Integer i : ints) {
+				broadcastSum += i;
 			}
 		}
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return (value.f1 == (broadcastSum / 11));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index 770cf88..24bc3e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
@@ -30,122 +25,94 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class FirstNITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 3;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public FirstNITCase(Configuration config) {
-		super(config);
+public class FirstNITCase extends MultipleProgramsTestBase {
+	public FirstNITCase(ExecutionMode mode){
+		super(mode);
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testFirstNOnUngroupedDS() throws Exception {
+		/*
+		 * First-n on ungrouped data set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+
+		seven.writeAsText(resultPath);
+		env.execute();
+
+		expected = "(7)\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testFirstNOnGroupedDS() throws Exception {
+		/*
+		 * First-n on grouped data set
+		 */
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+				.map(new OneMapper2()).groupBy(0).sum(1);
+
+		first.writeAsText(resultPath);
+		env.execute();
+
+		expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
 	}
-	
-	private static class FirstNProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * First-n on ungrouped data set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
-				
-				seven.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "(7)\n";
-			}
-			case 2: {
-				/*
-				 * First-n on grouped data set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
-															.map(new OneMapper2()).groupBy(0).sum(1);
-				
-				first.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
-			}
-			case 3: {
-				/*
-				 * First-n on grouped and sorted data set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+
+	@Test
+	public void testFirstNOnGroupedAndSortedDS() throws Exception {
+		/*
+		 * First-n on grouped and sorted data set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
 															.project(1,0);
-				
-				first.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "(1,1)\n"
-						+ "(2,3)\n(2,2)\n"
-						+ "(3,6)\n(3,5)\n(3,4)\n"
-						+ "(4,10)\n(4,9)\n(4,8)\n"
-						+ "(5,15)\n(5,14)\n(5,13)\n"
-						+ "(6,21)\n(6,20)\n(6,19)\n";
-				
-			}
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
+
+		first.writeAsText(resultPath);
+		env.execute();
+
+		expected = "(1,1)\n"
+				+ "(2,3)\n(2,2)\n"
+				+ "(3,6)\n(3,5)\n(3,4)\n"
+				+ "(4,10)\n(4,9)\n(4,8)\n"
+				+ "(5,15)\n(5,14)\n(5,13)\n"
+				+ "(6,21)\n(6,20)\n(6,19)\n";
 	}
 	
 	public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 906ba05..bf49eae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -29,356 +26,349 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class FlatMapITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 7;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class FlatMapITCase extends MultipleProgramsTestBase {
+	public FlatMapITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public FlatMapITCase(Configuration config) {
-		super(config);	
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = FlatMapProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testNonPassingFlatMap() throws Exception {
+		/*
+		 * Test non-passing flatmap
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> nonPassingFlatMapDs = ds.
+				flatMap(new FlatMapper1());
+
+		nonPassingFlatMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	public static class FlatMapper1 implements FlatMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			if ( value.contains("bananas") ) {
+				out.collect(value);
+			}
+		}
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testDataDuplicatingFlatMap() throws Exception {
+		/*
+		 * Test data duplicating flatmap
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> duplicatingFlatMapDs = ds.
+				flatMap(new FlatMapper2());
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+		duplicatingFlatMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = 	"Hi\n" + "HI\n" +
+				"Hello\n" + "HELLO\n" +
+				"Hello world\n" + "HELLO WORLD\n" +
+				"Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" +
+				"I am fine.\n" + "I AM FINE.\n" +
+				"Luke Skywalker\n" + "LUKE SKYWALKER\n" +
+				"Random comment\n" + "RANDOM COMMENT\n" +
+				"LOL\n" + "LOL\n";
+	}
+
+	public static class FlatMapper2 implements FlatMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			out.collect(value);
+			out.collect(value.toUpperCase());
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class FlatMapProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test non-passing flatmap
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-				DataSet<String> nonPassingFlatMapDs = ds.
-						flatMap(new FlatMapFunction<String, String>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public void flatMap(String value, Collector<String> out) throws Exception {
-								if ( value.contains("bananas") ) {
-									out.collect(value);
-								}
-							}
-						});
-				
-				nonPassingFlatMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"\n";
-			}
-			case 2: {
-				/*
-				 * Test data duplicating flatmap
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-				DataSet<String> duplicatingFlatMapDs = ds.
-						flatMap(new FlatMapFunction<String, String>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public void flatMap(String value, Collector<String> out) throws Exception {
-									out.collect(value);
-									out.collect(value.toUpperCase());
-							}
-						});
-				
-				duplicatingFlatMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"Hi\n" + "HI\n" +
-						"Hello\n" + "HELLO\n" +
-						"Hello world\n" + "HELLO WORLD\n" +
-						"Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" +
-						"I am fine.\n" + "I AM FINE.\n" +
-						"Luke Skywalker\n" + "LUKE SKYWALKER\n" +
-						"Random comment\n" + "RANDOM COMMENT\n" +
-						"LOL\n" + "LOL\n";
-			}
-			case 3: {
-				/*
-				 * Test flatmap with varying number of emitted tuples
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
-						flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public void flatMap(Tuple3<Integer, Long, String> value,
-									Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-								final int numTuples = value.f0 % 3; 
-								for ( int i = 0; i < numTuples; i++ ) {
-									out.collect(value);
-								}
-							}
-						});
-				
-				varyingTuplesMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return  "1,1,Hi\n" +
-						"2,2,Hello\n" + "2,2,Hello\n" +
-						"4,3,Hello world, how are you?\n" +
-						"5,3,I am fine.\n" + "5,3,I am fine.\n" +
-						"7,4,Comment#1\n" +
-						"8,4,Comment#2\n" + "8,4,Comment#2\n" + 
-						"10,4,Comment#4\n" +
-						"11,5,Comment#5\n" + "11,5,Comment#5\n" +
-						"13,5,Comment#7\n" +
-						"14,5,Comment#8\n" + "14,5,Comment#8\n" +
-						"16,6,Comment#10\n" +
-						"17,6,Comment#11\n" + "17,6,Comment#11\n" +
-						"19,6,Comment#13\n" +
-						"20,6,Comment#14\n" + "20,6,Comment#14\n";
-			}
-			case 4: {
-				/*
-				 * Test type conversion flatmapper (Custom -> Tuple)
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds.
-						flatMap(new FlatMapFunction<CustomType, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-							private final Tuple3<Integer, Long, String> outTuple = 
-									new Tuple3<Integer, Long, String>();
-							
-							@Override
-							public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out)
-									throws Exception {
-								outTuple.setField(value.myInt, 0);
-								outTuple.setField(value.myLong, 1);
-								outTuple.setField(value.myString, 2);
-								out.collect(outTuple);
-							}
-						});
-				
-				typeConversionFlatMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1,0,Hi\n" +
-						"2,1,Hello\n" +
-						"2,2,Hello world\n" +
-						"3,3,Hello world, how are you?\n" +
-						"3,4,I am fine.\n" +
-						"3,5,Luke Skywalker\n" +
-						"4,6,Comment#1\n" +
-						"4,7,Comment#2\n" +
-						"4,8,Comment#3\n" +
-						"4,9,Comment#4\n" +
-						"5,10,Comment#5\n" +
-						"5,11,Comment#6\n" +
-						"5,12,Comment#7\n" +
-						"5,13,Comment#8\n" +
-						"5,14,Comment#9\n" +
-						"6,15,Comment#10\n" +
-						"6,16,Comment#11\n" +
-						"6,17,Comment#12\n" +
-						"6,18,Comment#13\n" +
-						"6,19,Comment#14\n" +
-						"6,20,Comment#15\n";
-			}
-			case 5: {
-				/*
-				 * Test type conversion flatmapper (Tuple -> Basic)
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<String> typeConversionFlatMapDs = ds.
-						flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, String>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public void flatMap(Tuple3<Integer, Long, String> value, 
-									Collector<String> out) throws Exception {
-								out.collect(value.f2);
-							}
-						});
-				
-				typeConversionFlatMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"Hi\n" + "Hello\n" + "Hello world\n" +
-						"Hello world, how are you?\n" +
-						"I am fine.\n" + "Luke Skywalker\n" +
-						"Comment#1\n" +	"Comment#2\n" +
-						"Comment#3\n" +	"Comment#4\n" +
-						"Comment#5\n" +	"Comment#6\n" +
-						"Comment#7\n" + "Comment#8\n" +
-						"Comment#9\n" +	"Comment#10\n" +
-						"Comment#11\n" + "Comment#12\n" +
-						"Comment#13\n" + "Comment#14\n" +
-						"Comment#15\n";
-			}
-			case 6: {
-				/*
-				 * Test flatmapper if UDF returns input object 
-				 * multiple times and changes it in between
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
-						flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public void flatMap( Tuple3<Integer, Long, String> value,
-									Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-								final int numTuples = value.f0 % 4;
-								for ( int i = 0; i < numTuples; i++ ) {
-									value.setField(i, 0);
-									out.collect(value);
-								}							
-							}
-						});
-				
-				inputObjFlatMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return	"0,1,Hi\n" +
-						"0,2,Hello\n" + "1,2,Hello\n" +
-						"0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" +
-						"0,3,I am fine.\n" +
-						"0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" +
-						"0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" +
-						"0,4,Comment#3\n" +
-						"0,4,Comment#4\n" + "1,4,Comment#4\n" +
-						"0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" +
-						"0,5,Comment#7\n" +
-						"0,5,Comment#8\n" + "1,5,Comment#8\n" +
-						"0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" +
-						"0,6,Comment#11\n" +
-						"0,6,Comment#12\n" + "1,6,Comment#12\n" +
-						"0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" +
-						"0,6,Comment#15\n";
+
+	@Test
+	public void testFlatMapWithVaryingNumberOfEmittedTuples() throws Exception {
+		/*
+		 * Test flatmap with varying number of emitted tuples
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
+				flatMap(new FlatMapper3());
+
+		varyingTuplesMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected =  "1,1,Hi\n" +
+				"2,2,Hello\n" + "2,2,Hello\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n" + "5,3,I am fine.\n" +
+				"7,4,Comment#1\n" +
+				"8,4,Comment#2\n" + "8,4,Comment#2\n" +
+				"10,4,Comment#4\n" +
+				"11,5,Comment#5\n" + "11,5,Comment#5\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" + "14,5,Comment#8\n" +
+				"16,6,Comment#10\n" +
+				"17,6,Comment#11\n" + "17,6,Comment#11\n" +
+				"19,6,Comment#13\n" +
+				"20,6,Comment#14\n" + "20,6,Comment#14\n";
+	}
+
+	public static class FlatMapper3 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Tuple3<Integer, Long, String> value,
+				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			final int numTuples = value.f0 % 3;
+			for ( int i = 0; i < numTuples; i++ ) {
+				out.collect(value);
 			}
-			case 7: {
-				/*
-				 * Test flatmap with broadcast set 
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
-						flatMap(new RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-							private final Tuple3<Integer, Long, String> outTuple = 
-									new Tuple3<Integer, Long, String>();
-							private Integer f2Replace = 0;
-							
-							@Override
-							public void open(Configuration config) {
-								Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-								int sum = 0;
-								for(Integer i : ints) {
-									sum += i;
-								}
-								f2Replace = sum;
-							}
-							
-							@Override
-							public void flatMap(Tuple3<Integer, Long, String> value,
-									Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-								outTuple.setFields(f2Replace, value.f1, value.f2);
-								out.collect(outTuple);
-							}
-						}).withBroadcastSet(ints, "ints");
-				bcFlatMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"55,1,Hi\n" +
-						"55,2,Hello\n" +
-						"55,2,Hello world\n" +
-						"55,3,Hello world, how are you?\n" +
-						"55,3,I am fine.\n" +
-						"55,3,Luke Skywalker\n" +
-						"55,4,Comment#1\n" +
-						"55,4,Comment#2\n" +
-						"55,4,Comment#3\n" +
-						"55,4,Comment#4\n" +
-						"55,5,Comment#5\n" +
-						"55,5,Comment#6\n" +
-						"55,5,Comment#7\n" +
-						"55,5,Comment#8\n" +
-						"55,5,Comment#9\n" +
-						"55,6,Comment#10\n" +
-						"55,6,Comment#11\n" +
-						"55,6,Comment#12\n" +
-						"55,6,Comment#13\n" +
-						"55,6,Comment#14\n" +
-						"55,6,Comment#15\n";
+		}
+	}
+
+	@Test
+	public void testTypeConversionFlatMapperCustomToTuple() throws Exception {
+		/*
+		 * Test type conversion flatmapper (Custom -> Tuple)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds.
+				flatMap(new FlatMapper4());
+
+		typeConversionFlatMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = 	"1,0,Hi\n" +
+				"2,1,Hello\n" +
+				"2,2,Hello world\n" +
+				"3,3,Hello world, how are you?\n" +
+				"3,4,I am fine.\n" +
+				"3,5,Luke Skywalker\n" +
+				"4,6,Comment#1\n" +
+				"4,7,Comment#2\n" +
+				"4,8,Comment#3\n" +
+				"4,9,Comment#4\n" +
+				"5,10,Comment#5\n" +
+				"5,11,Comment#6\n" +
+				"5,12,Comment#7\n" +
+				"5,13,Comment#8\n" +
+				"5,14,Comment#9\n" +
+				"6,15,Comment#10\n" +
+				"6,16,Comment#11\n" +
+				"6,17,Comment#12\n" +
+				"6,18,Comment#13\n" +
+				"6,19,Comment#14\n" +
+				"6,20,Comment#15\n";
+	}
+
+	public static class FlatMapper4 implements FlatMapFunction<CustomType, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple3<Integer, Long, String> outTuple =
+				new Tuple3<Integer, Long, String>();
+
+		@Override
+		public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out)
+		throws Exception {
+			outTuple.setField(value.myInt, 0);
+			outTuple.setField(value.myLong, 1);
+			outTuple.setField(value.myString, 2);
+			out.collect(outTuple);
+		}
+	}
+
+	@Test
+	public void testTypeConversionFlatMapperTupleToBasic() throws Exception {
+		/*
+		 * Test type conversion flatmapper (Tuple -> Basic)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<String> typeConversionFlatMapDs = ds.
+				flatMap(new FlatMapper5());
+
+		typeConversionFlatMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = 	"Hi\n" + "Hello\n" + "Hello world\n" +
+				"Hello world, how are you?\n" +
+				"I am fine.\n" + "Luke Skywalker\n" +
+				"Comment#1\n" +	"Comment#2\n" +
+				"Comment#3\n" +	"Comment#4\n" +
+				"Comment#5\n" +	"Comment#6\n" +
+				"Comment#7\n" + "Comment#8\n" +
+				"Comment#9\n" +	"Comment#10\n" +
+				"Comment#11\n" + "Comment#12\n" +
+				"Comment#13\n" + "Comment#14\n" +
+				"Comment#15\n";
+	}
+
+	public static class FlatMapper5 implements FlatMapFunction<Tuple3<Integer, Long, String>,String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Tuple3<Integer, Long, String> value,
+				Collector<String> out) throws Exception {
+			out.collect(value.f2);
+		}
+	}
+
+	@Test
+	public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws
+			Exception {
+		/*
+		 * Test flatmapper if UDF returns input object
+		 * multiple times and changes it in between
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
+				flatMap(new FlatMapper6());
+
+		inputObjFlatMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected =	"0,1,Hi\n" +
+				"0,2,Hello\n" + "1,2,Hello\n" +
+				"0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" +
+				"0,3,I am fine.\n" +
+				"0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" +
+				"0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" +
+				"0,4,Comment#3\n" +
+				"0,4,Comment#4\n" + "1,4,Comment#4\n" +
+				"0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" +
+				"0,5,Comment#7\n" +
+				"0,5,Comment#8\n" + "1,5,Comment#8\n" +
+				"0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" +
+				"0,6,Comment#11\n" +
+				"0,6,Comment#12\n" + "1,6,Comment#12\n" +
+				"0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" +
+				"0,6,Comment#15\n";
+	}
+
+	public static class FlatMapper6 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap( Tuple3<Integer, Long, String> value,
+				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			final int numTuples = value.f0 % 4;
+			for ( int i = 0; i < numTuples; i++ ) {
+				value.setField(i, 0);
+				out.collect(value);
 			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+		}
+	}
+
+	@Test
+	public void testFlatMapWithBroadcastSet() throws Exception {
+		/*
+		 * Test flatmap with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
+				flatMap(new RichFlatMapper1()).withBroadcastSet(ints, "ints");
+		bcFlatMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = 	"55,1,Hi\n" +
+				"55,2,Hello\n" +
+				"55,2,Hello world\n" +
+				"55,3,Hello world, how are you?\n" +
+				"55,3,I am fine.\n" +
+				"55,3,Luke Skywalker\n" +
+				"55,4,Comment#1\n" +
+				"55,4,Comment#2\n" +
+				"55,4,Comment#3\n" +
+				"55,4,Comment#4\n" +
+				"55,5,Comment#5\n" +
+				"55,5,Comment#6\n" +
+				"55,5,Comment#7\n" +
+				"55,5,Comment#8\n" +
+				"55,5,Comment#9\n" +
+				"55,6,Comment#10\n" +
+				"55,6,Comment#11\n" +
+				"55,6,Comment#12\n" +
+				"55,6,Comment#13\n" +
+				"55,6,Comment#14\n" +
+				"55,6,Comment#15\n";
+	}
+
+	public static class RichFlatMapper1 extends RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple3<Integer, Long, String> outTuple =
+				new Tuple3<Integer, Long, String>();
+		private Integer f2Replace = 0;
+
+		@Override
+		public void open(Configuration config) {
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			int sum = 0;
+			for(Integer i : ints) {
+				sum += i;
 			}
-			
+			f2Replace = sum;
+		}
+
+		@Override
+		public void flatMap(Tuple3<Integer, Long, String> value,
+				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			outTuple.setFields(f2Replace, value.f1, value.f2);
+			out.collect(outTuple);
 		}
-	
 	}
 	
 }