You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:07 UTC

[71/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
index 7bde1a4..aa75836 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
@@ -18,35 +18,18 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
-@RunWith(Parameterized.class)
 public class ProjectITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 1; 
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+
 	private String resultPath;
 	private String expectedResult;
 	
-	public ProjectITCase(Configuration config) {
-		super(config);	
-	}
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		resultPath = getTempDirPath("result");
@@ -54,72 +37,37 @@ public class ProjectITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		expectedResult = ProjectProgs.runProgram(curProgId, resultPath);
+		/*
+		 * Projection with tuple fields indexes
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<String, Long, Integer>> projDs = ds.
+						project(3,4,2);
+		projDs.writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult = "Hallo,1,0\n" +
+				"Hallo Welt,2,1\n" +
+				"Hallo Welt wie,1,2\n" +
+				"Hallo Welt wie gehts?,2,3\n" +
+				"ABC,2,4\n" +
+				"BCD,3,5\n" +
+				"CDE,2,6\n" +
+				"DEF,1,7\n" +
+				"EFG,1,8\n" +
+				"FGH,2,9\n" +
+				"GHI,1,10\n" +
+				"HIJ,3,11\n" +
+				"IJK,3,12\n" +
+				"JKL,2,13\n" +
+				"KLM,2,14\n";
 	}
 	
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
-	}
-	
-		
-	private static class ProjectProgs {
-		
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Projection with tuple fields indexes
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple3<String, Long, Integer>> projDs = ds.
-						project(3,4,2);
-				projDs.writeAsCsv(resultPath);
-				
-				env.execute();
-				return "Hallo,1,0\n" +
-						"Hallo Welt,2,1\n" +
-						"Hallo Welt wie,1,2\n" +
-						"Hallo Welt wie gehts?,2,3\n" +
-						"ABC,2,4\n" +
-						"BCD,3,5\n" +
-						"CDE,2,6\n" +
-						"DEF,1,7\n" +
-						"EFG,1,8\n" +
-						"FGH,2,9\n" +
-						"GHI,1,10\n" +
-						"HIJ,3,11\n" +
-						"IJK,3,12\n" +
-						"JKL,2,13\n" +
-						"KLM,2,14\n";
-				
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-		
-	}
-	
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 1fcacb9..065be67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Date;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -38,371 +35,362 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
-public class ReduceITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 11;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class ReduceITCase extends MultipleProgramsTestBase {
+
+	public ReduceITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public ReduceITCase(Configuration config) {
-		super(config);
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
+		/*
+		 * Reduce on tuples with key field selector
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+				groupBy(1).reduce(new Tuple3Reduce("B-)"));
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"5,2,B-)\n" +
+				"15,3,B-)\n" +
+				"34,4,B-)\n" +
+				"65,5,B-)\n" +
+				"111,6,B-)\n";
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws Exception{
+		/*
+		 * Reduce on tuples with multiple key field selectors
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+				groupBy(4,0).reduce(new Tuple5Reduce());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"3,9,0,P-),2\n" +
+				"3,6,5,BCD,3\n" +
+				"4,17,0,P-),1\n" +
+				"4,17,0,P-),2\n" +
+				"5,11,10,GHI,1\n" +
+				"5,29,0,P-),2\n" +
+				"5,25,0,P-),3\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testReduceOnTuplesWithKeyExtractor() throws Exception {
+		/*
+		 * Reduce on tuples with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+				groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)"));
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"5,2,B-)\n" +
+				"15,3,B-)\n" +
+				"34,4,B-)\n" +
+				"65,5,B-)\n" +
+				"111,6,B-)\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Long getKey(Tuple3<Integer, Long, String> in) {
+			return in.f1;
+		}
+	}
+
+	@Test
+	public void testReduceOnCustomTypeWithKeyExtractor() throws Exception {
+		/*
+		 * Reduce on custom type with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> reduceDs = ds.
+				groupBy(new KeySelector2()).reduce(new CustomTypeReduce());
+
+		reduceDs.writeAsText(resultPath);
+		env.execute();
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+		expected = "1,0,Hi\n" +
+				"2,3,Hello!\n" +
+				"3,12,Hello!\n" +
+				"4,30,Hello!\n" +
+				"5,60,Hello!\n" +
+				"6,105,Hello!\n";
+	}
+
+	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class ReduceProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Reduce on tuples with key field selector
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).reduce(new Tuple3Reduce("B-)"));
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,B-)\n" +
-						"15,3,B-)\n" +
-						"34,4,B-)\n" +
-						"65,5,B-)\n" +
-						"111,6,B-)\n";
-			}
-			case 2: {
-				/*
-				 * Reduce on tuples with multiple key field selectors
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
-						groupBy(4,0).reduce(new Tuple5Reduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,Hallo,1\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"3,9,0,P-),2\n" +
-						"3,6,5,BCD,3\n" +
-						"4,17,0,P-),1\n" +
-						"4,17,0,P-),2\n" +
-						"5,11,10,GHI,1\n" +
-						"5,29,0,P-),2\n" +
-						"5,25,0,P-),3\n";
-			} 
-			case 3: {
-				/*
-				 * Reduce on tuples with key extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Long getKey(Tuple3<Integer, Long, String> in) {
-										return in.f1;
-									}
-								}).reduce(new Tuple3Reduce("B-)"));
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,B-)\n" +
-						"15,3,B-)\n" +
-						"34,4,B-)\n" +
-						"65,5,B-)\n" +
-						"111,6,B-)\n";
-				
-			}
-			case 4: {
-				/*
-				 * Reduce on custom type with key extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> reduceDs = ds.
-						groupBy(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).reduce(new CustomTypeReduce());
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,Hi\n" +
-						"2,3,Hello!\n" +
-						"3,12,Hello!\n" +
-						"4,30,Hello!\n" +
-						"5,60,Hello!\n" +
-						"6,105,Hello!\n";
-			}
-			case 5: {
-				/*
-				 * All-reduce for tuple
-				 */
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						reduce(new AllAddingTuple3Reduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "231,91,Hello World\n";
-			}
-			case 6: {
-				/*
-				 * All-reduce for custom types
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> reduceDs = ds.
-						reduce(new AllAddingCustomTypeReduce());
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "91,210,Hello!";
-			}
-			case 7: {
-				
-				/*
-				 * Reduce with broadcast set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,55\n" +
-						"15,3,55\n" +
-						"34,4,55\n" +
-						"65,5,55\n" +
-						"111,6,55\n";
-			}
-			case 8: {
-				/*
-				 * Reduce with UDF that returns the second input object (check mutable object handling)
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).reduce(new InputReturningTuple3Reduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,Hi again!\n" +
-						"15,3,Hi again!\n" +
-						"34,4,Hi again!\n" +
-						"65,5,Hi again!\n" +
-						"111,6,Hi again!\n";
-			}
-			case 9: {
-				/*
-				 * Reduce with a Tuple-returning KeySelector 
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs = ds .
-						groupBy(
-								new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
-									private static final long serialVersionUID = 1L;
-		
-									@Override
-									public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-										return new Tuple2<Integer, Long>(t.f0, t.f4);
-									}
-								}).reduce(new Tuple5Reduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				return "1,1,0,Hallo,1\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"3,9,0,P-),2\n" +
-						"3,6,5,BCD,3\n" +
-						"4,17,0,P-),1\n" +
-						"4,17,0,P-),2\n" +
-						"5,11,10,GHI,1\n" +
-						"5,29,0,P-),2\n" +
-						"5,25,0,P-),3\n";
-			}
-			case 10: {
-				/*
-				 * Case 2 with String-based field expression
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
-						groupBy("f4","f0").reduce(new Tuple5Reduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,Hallo,1\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"3,9,0,P-),2\n" +
-						"3,6,5,BCD,3\n" +
-						"4,17,0,P-),1\n" +
-						"4,17,0,P-),2\n" +
-						"5,11,10,GHI,1\n" +
-						"5,29,0,P-),2\n" +
-						"5,25,0,P-),3\n";
-			}
-			case 11: {
-				/**
-				 * Test support for Date and enum serialization
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new MapFunction<Long, PojoWithDateAndEnum>() {
-					@Override
-					public PojoWithDateAndEnum map(Long value) throws Exception {
-						int l = value.intValue();
-						switch (l) {
-							case 0:
-								PojoWithDateAndEnum one = new PojoWithDateAndEnum();
-								one.group = "a";
-								one.date = new Date(666);
-								one.cat = CollectionDataSets.Category.CAT_A;
-								return one;
-							case 1:
-								PojoWithDateAndEnum two = new PojoWithDateAndEnum();
-								two.group = "a";
-								two.date = new Date(666);
-								two.cat = CollectionDataSets.Category.CAT_A;
-								return two;
-							case 2:
-								PojoWithDateAndEnum three = new PojoWithDateAndEnum();
-								three.group = "b";
-								three.date = new Date(666);
-								three.cat = CollectionDataSets.Category.CAT_B;
-								return three;
-						}
-						throw new RuntimeException("Unexpected value for l=" + l);
-					}
-				});
-				ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
-
-				DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void reduce(Iterable<PojoWithDateAndEnum> values,
-							Collector<String> out) throws Exception {
-						for(PojoWithDateAndEnum val : values) {
-							if(val.cat == CollectionDataSets.Category.CAT_A) {
-								Assert.assertEquals("a", val.group);
-							} else if(val.cat == CollectionDataSets.Category.CAT_B) {
-								Assert.assertEquals("b", val.group);
-							} else {
-								Assert.fail("error. Cat = "+val.cat);
-							}
-							Assert.assertEquals(666, val.date.getTime());
-						}
-						out.collect("ok");
-					}
-				});
-				
-				res.writeAsText(resultPath);
-				env.execute();
-				return "ok\nok";
+
+	@Test
+	public void testAllReduceForTuple() throws Exception {
+		/*
+		 * All-reduce for tuple
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+				reduce(new AllAddingTuple3Reduce());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "231,91,Hello World\n";
+	}
+
+	@Test
+	public void testAllReduceForCustomTypes() throws Exception {
+		/*
+		 * All-reduce for custom types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> reduceDs = ds.
+				reduce(new AllAddingCustomTypeReduce());
+
+		reduceDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "91,210,Hello!";
+	}
+
+	@Test
+	public void testReduceWithBroadcastSet() throws Exception {
+		/*
+		 * Reduce with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+				groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"5,2,55\n" +
+				"15,3,55\n" +
+				"34,4,55\n" +
+				"65,5,55\n" +
+				"111,6,55\n";
+	}
+
+	@Test
+	public void testReduceWithUDFThatReturnsTheSecondInputObject() throws Exception {
+		/*
+		 * Reduce with UDF that returns the second input object (check mutable object handling)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+				groupBy(1).reduce(new InputReturningTuple3Reduce());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"5,2,Hi again!\n" +
+				"15,3,Hi again!\n" +
+				"34,4,Hi again!\n" +
+				"65,5,Hi again!\n" +
+				"111,6,Hi again!\n";
+	}
+
+	@Test
+	public void testReduceATupleReturningKeySelector() throws Exception {
+		/*
+		 * Reduce with a Tuple-returning KeySelector
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs = ds .
+				groupBy(new KeySelector3()).reduce(new Tuple5Reduce());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"3,9,0,P-),2\n" +
+				"3,6,5,BCD,3\n" +
+				"4,17,0,P-),1\n" +
+				"4,17,0,P-),2\n" +
+				"5,11,10,GHI,1\n" +
+				"5,29,0,P-),2\n" +
+				"5,25,0,P-),3\n";
+	}
+
+	public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f4);
+		}
+	}
+
+	@Test
+	public void testReduceOnTupleWithMultipleKeyExpressions() throws Exception {
+		/*
+		 * Case 2 with String-based field expression
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+				groupBy("f4","f0").reduce(new Tuple5Reduce());
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"3,9,0,P-),2\n" +
+				"3,6,5,BCD,3\n" +
+				"4,17,0,P-),1\n" +
+				"4,17,0,P-),2\n" +
+				"5,11,10,GHI,1\n" +
+				"5,29,0,P-),2\n" +
+				"5,25,0,P-),3\n";
+	}
+
+	@Test
+	public void testSupportForDataAndEnumSerialization() throws Exception {
+		/**
+		 * Test support for Date and enum serialization
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new Mapper1());
+		ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
+
+		DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1());
+
+		res.writeAsText(resultPath);
+		env.execute();
+		expected = "ok\nok";
+	}
+
+	public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> {
+		@Override
+		public PojoWithDateAndEnum map(Long value) throws Exception {
+			int l = value.intValue();
+			switch (l) {
+				case 0:
+					PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+					one.group = "a";
+					one.date = new Date(666);
+					one.cat = CollectionDataSets.Category.CAT_A;
+					return one;
+				case 1:
+					PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+					two.group = "a";
+					two.date = new Date(666);
+					two.cat = CollectionDataSets.Category.CAT_A;
+					return two;
+				case 2:
+					PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+					three.group = "b";
+					three.date = new Date(666);
+					three.cat = CollectionDataSets.Category.CAT_B;
+					return three;
 			}
-			
-			default:
-				throw new IllegalArgumentException("Invalid program id");
+			throw new RuntimeException("Unexpected value for l=" + l);
+		}
+	}
+
+	public static class GroupReducer1 implements GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterable<PojoWithDateAndEnum> values,
+				Collector<String> out) throws Exception {
+			for(PojoWithDateAndEnum val : values) {
+				if(val.cat == CollectionDataSets.Category.CAT_A) {
+					Assert.assertEquals("a", val.group);
+				} else if(val.cat == CollectionDataSets.Category.CAT_B) {
+					Assert.assertEquals("b", val.group);
+				} else {
+					Assert.fail("error. Cat = "+val.cat);
+				}
+				Assert.assertEquals(666, val.date.getTime());
 			}
-			
+			out.collect("ok");
 		}
-	
 	}
 	
 	public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index d63d08c..ee12fa4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -23,128 +23,96 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 @RunWith(Parameterized.class)
-public class SumMinMaxITCase extends JavaProgramTestBase  {
+public class SumMinMaxITCase extends MultipleProgramsTestBase {
 
-	private static int NUM_PROGRAMS = 3;
+	public SumMinMaxITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
 
-	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
-	private String expectedResult;
+	private String expected;
 
-	public SumMinMaxITCase(Configuration config) {
-		super(config);
-	}
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath);
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+	@Test
+	public void testSumMaxAndProject() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> sumDs = ds
+				.sum(0)
+				.andMax(1)
+				.project(0, 1);
+
+		sumDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "231,6\n";
 	}
 
-	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		/*
+		 * Grouped Aggregate
+		 */
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+				.sum(0)
+				.project(1, 0);
 
-		return toParameterList(tConfigs);
+		aggregateDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n" +
+				"6,111\n";
 	}
 
-	/**
-	 * These tests are copied from
-	 * @see org.apache.flink.test.javaApiOperators.AggregateITCase
-	 * replacing calls to aggregate with calls to sum, min, and max
-	 */
-	private static class SumMinMaxProgs {
-
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			switch(progId) {
-				case 1: {
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-					DataSet<Tuple2<Integer, Long>> sumDs = ds
-							.sum(0)
-							.andMax(1)
-							.project(0, 1);
-
-					sumDs.writeAsCsv(resultPath);
-					env.execute();
-
-					// return expected result
-					return "231,6\n";
-				}
-				case 2: {
-				/*
-				 * Grouped Aggregate
-				 */
-
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-					DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
-							.sum(0)
-							.project(1, 0);
-
-					aggregateDs.writeAsCsv(resultPath);
-					env.execute();
-
-					// return expected result
-					return "1,1\n" +
-							"2,5\n" +
-							"3,15\n" +
-							"4,34\n" +
-							"5,65\n" +
-							"6,111\n";
-				}
-				case 3: {
-				/*
-				 * Nested Aggregate
-				 */
-
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-					DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
-							.min(0)
-							.min(0)
-							.project(0);
-
-					aggregateDs.writeAsCsv(resultPath);
-					env.execute();
-
-					// return expected result
-					return "1\n";
-				}
-				default:
-					throw new IllegalArgumentException("Invalid program id");
-			}
-		}
+	@Test
+	public void testNestedAggregate() throws Exception {
+		/*
+		 * Nested Aggregate
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+				.min(0)
+				.min(0)
+				.project(0);
+
+		aggregateDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n";
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 4dabc62..0a23da8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -18,157 +18,129 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class UnionITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 3; 
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class UnionITCase extends MultipleProgramsTestBase {
+
+	private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
+			"2,2,Hello\n" +
+			"3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" +
+			"5,3,I am fine.\n" +
+			"6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" +
+			"8,4,Comment#2\n" +
+			"9,4,Comment#3\n" +
+			"10,4,Comment#4\n" +
+			"11,5,Comment#5\n" +
+			"12,5,Comment#6\n" +
+			"13,5,Comment#7\n" +
+			"14,5,Comment#8\n" +
+			"15,5,Comment#9\n" +
+			"16,6,Comment#10\n" +
+			"17,6,Comment#11\n" +
+			"18,6,Comment#12\n" +
+			"19,6,Comment#13\n" +
+			"20,6,Comment#14\n" +
+			"21,6,Comment#15\n";
+
+	public UnionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
+	private String expected;
 
-	public UnionITCase(Configuration config) {
-		super(config);	
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = UnionProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testUnion2IdenticalDataSets() throws Exception {
+		/*
+		 * Union of 2 Same Data Sets
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
+
+		unionDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testUnion5IdenticalDataSets() throws Exception {
+		/*
+		 * Union of 5 same Data Sets, with multiple unions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
+				.union(CollectionDataSets.get3TupleDataSet(env))
+				.union(CollectionDataSets.get3TupleDataSet(env))
+				.union(CollectionDataSets.get3TupleDataSet(env));
+
+		unionDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+				FULL_TUPLE_3_STRING +	FULL_TUPLE_3_STRING;
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testUnionWithEmptyDataSet() throws Exception {
+		/*
+		 * Test on union with empty dataset
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+		// Don't know how to make an empty result in an other way than filtering it
+		DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
+				filter(new RichFilter1());
+
+		DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
+				.union(empty);
+
+		unionDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = FULL_TUPLE_3_STRING;
 	}
-	
-	private static class UnionProgs {
-
-		private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" +
-				"6,3,Luke Skywalker\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" +
-				"9,4,Comment#3\n" +
-				"10,4,Comment#4\n" +
-				"11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n" +
-				"16,6,Comment#10\n" +
-				"17,6,Comment#11\n" +
-				"18,6,Comment#12\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" +
-				"21,6,Comment#15\n";
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Union of 2 Same Data Sets
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
-				
-				unionDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
-			}
-			case 2: {
-				/*
-				 * Union of 5 same Data Sets, with multiple unions
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
-						.union(CollectionDataSets.get3TupleDataSet(env))
-						.union(CollectionDataSets.get3TupleDataSet(env))
-						.union(CollectionDataSets.get3TupleDataSet(env));
-				
-				unionDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
-			}
-			case 3: {
-				/*
-				 * Test on union with empty dataset
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				// Don't know how to make an empty result in an other way than filtering it 
-				DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
-						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-								return false;
-							}
-						});
-				
-				DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
-					.union(empty);
-			
-				unionDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return FULL_TUPLE_3_STRING;				
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
+
+	public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+			return false;
 		}
-	
 	}
 	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 0d6b763..ba66695 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,8 +19,12 @@ package org.apache.flink.api.scala.functions
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
 import org.junit.Assert.fail
+import org.junit.{After, Before, Test, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
@@ -34,79 +38,75 @@ import org.apache.flink.api.common.InvalidProgramException
 /* The test cases are originally from the Apache Spark project. Like the ClosureCleaner itself. */
 
 @RunWith(classOf[Parameterized])
-class ClosureCleanerITCase(config: Configuration) extends JavaProgramTestBase(config) {
+class ClosureCleanerITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  val _tempFolder = new TemporaryFolder()
+  var resultPath: String = _
+  var result: String = _
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = curProgId match {
-      case 1 =>
-        TestObject.run(resultPath)
-        "30" // 6 + 7 + 8 + 9
-
-      case 2 =>
-        val obj = new TestClass
-        obj.run(resultPath)
-        "30" // 6 + 7 + 8 + 9
-
-      case 3 =>
-        val obj = new TestClassWithoutDefaultConstructor(5)
-        obj.run(resultPath)
-        "30" // 6 + 7 + 8 + 9
+  @Rule
+  def tempFolder = _tempFolder
 
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-      case 4 =>
-        val obj = new TestClassWithoutFieldAccess
-        obj.run(resultPath)
-        "30" // 6 + 7 + 8 + 9
-
-      case 5 =>
-        TestObjectWithNesting.run(resultPath)
-        "27"
-
-      case 6 =>
-        val obj = new TestClassWithNesting(1)
-        obj.run(resultPath)
-        "27"
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(result, resultPath)
+  }
 
-      case 7 =>
-        TestObjectWithBogusReturns.run(resultPath)
-        "1"
+  @Test
+  def testObject: Unit = {
+    TestObject.run(resultPath)
+    result = "30"
+  }
 
-      case 8 =>
-        TestObjectWithNestedReturns.run(resultPath)
-        "1"
+  @Test
+  def testClass: Unit = {
+    val obj = new TestClass
+    obj.run(resultPath)
+    result = "30"
+  }
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Test
+  def testClassWithoutDefaulConstructor: Unit = {
+    val obj = new TestClassWithoutDefaultConstructor(5)
+    obj.run(resultPath)
+    result = "30"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testClassWithoutFieldAccess: Unit = {
+    val obj = new TestClassWithoutFieldAccess
+    obj.run(resultPath)
+    result = "30" // 6 + 7 + 8 + 9
   }
-}
 
-object ClosureCleanerITCase {
+  @Test
+  def testObjectWithNesting: Unit = {
+    TestObjectWithNesting.run(resultPath)
+    result = "27"
+  }
 
-  val NUM_PROGRAMS = 6
+  @Test
+  def testClassWithNesting: Unit = {
+    val obj = new TestClassWithNesting(1)
+    obj.run(resultPath)
+    result = "27"
+  }
 
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
+  @Test
+  def testObjectWithBogusReturns: Unit = {
+    TestObjectWithBogusReturns.run(resultPath)
+    result = "1"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testObjectWithNestedReturns: Unit = {
+    TestObjectWithNestedReturns.run(resultPath)
+    result = "1"
   }
 }
 
@@ -121,7 +121,7 @@ object TestObject {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val nums = env.fromElements(1, 2, 3, 4)
 
-    nums.map(_ + x).reduce(_ + _).writeAsText(resultPath)
+    nums.map(_ + x).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -138,7 +138,7 @@ class TestClass extends Serializable {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val nums = env.fromElements(1, 2, 3, 4)
 
-    nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath)
+    nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -153,7 +153,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val nums = env.fromElements(1, 2, 3, 4)
 
-    nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath)
+    nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -171,7 +171,7 @@ class TestClassWithoutFieldAccess {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val nums = env.fromElements(1, 2, 3, 4)
 
-    nums.map(_ + x).reduce(_ + _).writeAsText(resultPath)
+    nums.map(_ + x).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -193,7 +193,7 @@ object TestObjectWithBogusReturns {
       case _ => fail("Bogus return statement not detected.")
     }
 
-    nums.writeAsText(resultPath)
+    nums.writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -213,7 +213,7 @@ object TestObjectWithNestedReturns {
         foo()
     }
 
-    nums.writeAsText(resultPath)
+    nums.writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -235,7 +235,7 @@ object TestObjectWithNesting {
         in.map(_ + x + y).reduce(_ + _).withBroadcastSet(nums, "nums")
     }
 
-    result.writeAsText(resultPath)
+    result.writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }
@@ -259,7 +259,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
         in.map(_ + x + getY).reduce(_ + _).withBroadcastSet(nums, "nums")
     }
 
-    result.writeAsText(resultPath)
+    result.writeAsText(resultPath, WriteMode.OVERWRITE)
 
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 75e8a66..ae3512a 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -20,121 +20,97 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class AggregateITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-object AggregateProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        // Full aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .aggregate(Aggregations.SUM,0)
-          .and(Aggregations.MAX, 1)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map{ t => (t._1, t._2) }
+  val _tempFolder = new TemporaryFolder()
 
-        aggregateDs.writeAsCsv(resultPath)
+  private var resultPath: String = null
+  private var expectedResult: String = null
 
-        env.execute()
+  @Rule
+  def tempFolder = _tempFolder
 
-        // return expected result
-        "231,6\n"
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-      case 2 =>
-        // Grouped aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
+  @After
+  def after(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
 
-        val aggregateDs = ds
-          .groupBy(1)
-          .aggregate(Aggregations.SUM, 0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => (t._2, t._1) }
+  @Test
+  def testFullAggregate: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
-        aggregateDs.writeAsCsv(resultPath)
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-        env.execute()
+    val aggregateDs = ds
+      .aggregate(Aggregations.SUM,0)
+      .and(Aggregations.MAX, 1)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map{ t => (t._1, t._2) }
 
-        // return expected result
-        "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-      case 3 =>
-        // Nested aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
+    env.execute()
 
-        val aggregateDs = ds
-          .groupBy(1)
-          .aggregate(Aggregations.MIN, 0)
-          .aggregate(Aggregations.MIN, 0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => new Tuple1(t._1) }
+    // return expected result
+    expectedResult = "231,6\n"
+  }
 
-        aggregateDs.writeAsCsv(resultPath)
+  @Test
+  def testGroupedAggregate: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-        env.execute()
+    val aggregateDs = ds
+      .groupBy(1)
+      .aggregate(Aggregations.SUM, 0)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map { t => (t._2, t._1) }
 
-        // return expected result
-        "1\n"
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
+    env.execute()
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+    // return expected result
+    expectedResult = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
   }
-}
-
 
-@RunWith(classOf[Parameterized])
-class AggregateITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testNestedAggregate: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+    val aggregateDs = ds
+      .groupBy(1)
+      .aggregate(Aggregations.MIN, 0)
+      .aggregate(Aggregations.MIN, 0)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map { t => new Tuple1(t._1) }
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-  protected def testProgram(): Unit = {
-    expectedResult = AggregateProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
+    env.execute()
 
-object AggregateITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to AggregateProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
+    // return expected result
+    expectedResult = "1\n"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 72e5648..93ade52 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -21,382 +21,367 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.junit.Assert
+import org.junit._
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  val _tempFolder = new TemporaryFolder()
+  var resultPath: String = _
+  var expectedResult: String = _
 
-object CoGroupProgs {
-  var NUM_PROGRAMS: Int = 13
+  @Rule
+  def tempFolder = _tempFolder
 
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * CoGroup on tuples with key field selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second) =>
-            var sum = 0
-            var id = 0
-            for (t <- first) {
-              sum += t._3
-              id = t._1
-            }
-            for (t <- second) {
-              sum += t._3
-              id = t._1
-            }
-            (id, sum)
-        }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-      case 2 =>
-        /*
-         * CoGroup on two custom type inputs with key extractors
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
 
-        val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
-          (first, second) =>
-            val o = new CustomType(0, 0, "test")
-            for (c <- first) {
-              o.myInt = c.myInt
-              o.myLong += c.myLong
-            }
-            for (c <- second) {
-              o.myInt = c.myInt
-              o.myLong += c.myLong
-            }
-            o
+  @Test
+  def testCoGroupOnTuplesWithKeyFieldSelector: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+      (first, second) =>
+        var sum = 0
+        var id = 0
+        for (t <- first) {
+          sum += t._3
+          id = t._1
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
-          "210,test\n"
-
-      case 3 =>
-        /*
-         * check correctness of cogroup if UDF returns left input objects
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second, out: Collector[(Int, Long, String)] ) =>
-            for (t <- first) {
-              if (t._1 < 6) {
-                out.collect(t)
-              }
-            }
+        for (t <- second) {
+          sum += t._3
+          id = t._1
         }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n"
+        (id, sum)
+    }
+    coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
+  }
 
-      case 4 =>
-        /*
-         * check correctness of cogroup if UDF returns right input objects
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
-            for (t <- second) {
-              if (t._1 < 4) {
-                out.collect(t)
-              }
-            }
+  @Test
+  def testCoGroupOnTwoCustomInputsWithKeyExtractors: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+
+    val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
+      (first, second) =>
+        val o = new CustomType(0, 0, "test")
+        for (c <- first) {
+          o.myInt = c.myInt
+          o.myLong += c.myLong
         }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" + "3,4,3," +
-          "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
-
-      case 5 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
-          new RichCoGroupFunction[
-            (Int, Long, Int, String, Long),
-            (Int, Long, Int, String, Long),
-            (Int, Int, Int)] {
-            private var broadcast = 41
-
-            override def open(config: Configuration) {
-              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              broadcast = ints.sum
-            }
-
-            override def coGroup(
-                first: java.lang.Iterable[(Int, Long, Int, String, Long)],
-                second: java.lang.Iterable[(Int, Long, Int, String, Long)],
-                out: Collector[(Int, Int, Int)]): Unit = {
-              var sum = 0
-              var id = 0
-              for (t <- first.asScala) {
-                sum += t._3
-                id = t._1
-              }
-              for (t <- second.asScala) {
-                sum += t._3
-                id = t._1
-              }
-              out.collect((id, sum, broadcast))
-            }
-
-        }).withBroadcastSet(intDs, "ints")
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
-
-      case 6 =>
-        /*
-         * CoGroup on a tuple input with key field selector and a custom type input with
-         * key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
-          (first, second) =>
-            var sum = 0L
-            var id = 0
-            for (t <- first) {
-              sum += t._1
-              id = t._3
-            }
-            for (t <- second) {
-              sum += t.myLong
-              id = t.myInt
-            }
-            (id, sum, "test")
+        for (c <- second) {
+          o.myInt = c.myInt
+          o.myLong += c.myLong
         }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
-          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
-          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
-
-      case 7 =>
-        /*
-         * CoGroup on a tuple input with key field selector and a custom type input with
-         * key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
-          (first, second) =>
-            var sum = 0L
-            var id = 0
-            for (t <- first) {
-              sum += t.myLong
-              id = t.myInt
-            }
-            for (t <- second) {
-              sum += t._1
-              id = t._3
-            }
+        o
+    }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" +
+      "6," + "210,test\n"
+  }
 
-            new CustomType(id, sum, "test")
+  @Test
+  def testCorrectnessIfCoGroupReturnsLeftInputObjects: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+      (first, second, out: Collector[(Int, Long, String)] ) =>
+        for (t <- first) {
+          if (t._1 < 6) {
+            out.collect(t)
+          }
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
-          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
-          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+    }
+    coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n"
+  }
 
-      case 8 =>
-        /*
-         * CoGroup with multiple key fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
-          (first, second, out: Collector[(Int, Long, String)]) =>
-            val strs = first map(_._4)
-            for (t <- second) {
-              for (s <- strs) {
-                out.collect((t._1, t._2, s))
-              }
-            }
+  @Test
+  def testCorrectnessIfCoGroupReturnsRightInputObjects: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+      (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
+        for (t <- second) {
+          if (t._1 < 4) {
+            out.collect(t)
+          }
         }
+    }
+    coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" +
+      "3,4,3," + "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
+  }
 
-        coGrouped.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
-          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+  @Test
+  def testCoGroupWithBroadcastVariable: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val intDs = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
+      new RichCoGroupFunction[
+        (Int, Long, Int, String, Long),
+        (Int, Long, Int, String, Long),
+        (Int, Int, Int)] {
+        private var broadcast = 41
+
+        override def open(config: Configuration) {
+          val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+          broadcast = ints.sum
+        }
 
-      case 9 =>
-        /*
-         * CoGroup with multiple key fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets
-          .get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
-          .apply {
-          (first, second, out: Collector[(Int, Long, String)]) =>
-            val strs = first map(_._4)
-            for (t <- second) {
-              for (s <- strs) {
-                out.collect((t._1, t._2, s))
-              }
-            }
+        override def coGroup(
+                              first: java.lang.Iterable[(Int, Long, Int, String, Long)],
+                              second: java.lang.Iterable[(Int, Long, Int, String, Long)],
+                              out: Collector[(Int, Int, Int)]): Unit = {
+          var sum = 0
+          var id = 0
+          for (t <- first.asScala) {
+            sum += t._3
+            id = t._1
+          }
+          for (t <- second.asScala) {
+            sum += t._3
+            id = t._1
+          }
+          out.collect((id, sum, broadcast))
         }
 
-        coGrouped.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
-          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+      }).withBroadcastSet(intDs, "ints")
+    coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
+  }
 
-      case 10 =>
-        /*
-         * CoGroup on two custom type inputs using expression keys
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
-          (first, second) =>
-            val o = new CustomType(0, 0, "test")
-            for (t <- first) {
-              o.myInt = t.myInt
-              o.myLong += t.myLong
-            }
-            for (t <- second) {
-              o.myInt = t.myInt
-              o.myLong += t.myLong
-            }
-            o
+  @Test
+  def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
+      (first, second) =>
+        var sum = 0L
+        var id = 0
+        for (t <- first) {
+          sum += t._1
+          id = t._3
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
-          "210,test\n"
-
-      case 11 =>
-        /*
-         * CoGroup on two custom type inputs using expression keys
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
+        for (t <- second) {
+          sum += t.myLong
+          id = t.myInt
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+        (id, sum, "test")
+    }
+    coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" +
+      "5," + "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+      "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+  }
 
-      case 12 =>
-        /*
-         * CoGroup field-selector (expression keys) + key selector function
-         * The key selector is unnecessary complicated (Tuple1) ;)
+  @Test
+  def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector: Unit = {
+    /*
+         * CoGroup on a tuple input with key field selector and a custom type input with
+         * key extractor
          */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+    val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
+      (first, second) =>
+        var sum = 0L
+        var id = 0
+        for (t <- first) {
+          sum += t.myLong
+          id = t.myInt
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
-
-      case 13 =>
-        /*
-         * CoGroup field-selector (expression keys) + key selector function
-         * The key selector is simple here
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
+        for (t <- second) {
+          sum += t._1
+          id = t._3
         }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
+        new CustomType(id, sum, "test")
     }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" +
+      "5," + "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+      "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
   }
-}
 
+  @Test
+  def testCoGroupWithMultipleKeyFields: Unit = {
+    /*
+        * CoGroup with multiple key fields
+        */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env)
+    val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
+      (first, second, out: Collector[(Int, Long, String)]) =>
+        val strs = first map(_._4)
+        for (t <- second) {
+          for (s <- strs) {
+            out.collect((t._1, t._2, s))
+          }
+        }
+    }
 
-@RunWith(classOf[Parameterized])
-class CoGroupITCase(config: Configuration) extends JavaProgramTestBase(config) {
+    coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+      "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+  }
 
-  private val curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testCoGroupWithMultipleKeyExtractors: Unit = {
+    /*
+        * CoGroup with multiple key extractors
+        */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets
+      .get5TupleDataSet(env)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env)
+    val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
+      .apply {
+      (first, second, out: Collector[(Int, Long, String)]) =>
+        val strs = first map(_._4)
+        for (t <- second) {
+          for (s <- strs) {
+            out.collect((t._1, t._2, s))
+          }
+        }
+    }
+
+    coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+      "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = CoGroupProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testCoGroupOnTwoCustomTypesUsingExpressionKeys: Unit = {
+    /*
+     * CoGroup on two custom type inputs using expression keys
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
+      (first, second) =>
+        val o = new CustomType(0, 0, "test")
+        for (t <- first) {
+          o.myInt = t.myInt
+          o.myLong += t.myLong
+        }
+        for (t <- second) {
+          o.myInt = t.myInt
+          o.myLong += t.myLong
+        }
+        o
+    }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" +
+      "6," + "210,test\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector: Unit = {
+    /*
+     * CoGroup on two custom type inputs using expression keys
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmallPojoDataSet(env)
+    val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
+      (first, second, out: Collector[CustomType]) =>
+        for (p <- first) {
+          for (t <- second) {
+            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+          }
+        }
+    }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
   }
-}
 
-object CoGroupITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to CoGroupProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testCoGroupFieldSelectorAndKeySelector: Unit = {
+    /*
+     * CoGroup field-selector (expression keys) + key selector function
+     * The key selector is unnecessary complicated (Tuple1) ;)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmallPojoDataSet(env)
+    val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
+      (first, second, out: Collector[CustomType]) =>
+        for (p <- first) {
+          for (t <- second) {
+            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+          }
+        }
     }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testCoGroupKeySelectorAndFieldSelector: Unit = {
+    /*
+         * CoGroup field-selector (expression keys) + key selector function
+         * The key selector is simple here
+         */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmallPojoDataSet(env)
+    val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+    val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
+      (first, second, out: Collector[CustomType]) =>
+        for (p <- first) {
+          for (t <- second) {
+            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+          }
+        }
+    }
+    coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
   }
 }