You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:08 UTC

[72/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 5f8de8a..f10a9df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -36,659 +33,636 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
-public class JoinITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 23;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class JoinITCase extends MultipleProgramsTestBase {
+
+	public JoinITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public JoinITCase(Configuration config) {
-		super(config);
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = JoinProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception {
+		/*
+		 * UDF Join on tuples with key field positions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(1)
+						.with(new T3T5FlatJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
+		/*
+		 * UDF Join on tuples with multiple key field positions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.join(ds2)
+						.where(0,1)
+						.equalTo(0,4)
+						.with(new T3T5FlatJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt wie gehts?\n" +
+				"Hello world,ABC\n" +
+				"I am fine.,HIJ\n" +
+				"I am fine.,IJK\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testDefaultJoinOnTuples() throws Exception {
+		/*
+		 * Default Join on tuples
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Tuple3<Integer, Long, String>,Tuple5<Integer, Long, Integer, String, Long>>> joinDs =
+				ds1.join(ds2)
+						.where(0)
+						.equalTo(2);
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+				"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+				"(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class JoinProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				
-				/*
-				 * UDF Join on tuples with key field positions
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = 
-						ds1.join(ds2)
+
+	@Test
+	public void testJoinWithHuge() throws Exception {
+		/*
+		 * Join with Huge
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
+				.where(1)
+				.equalTo(1)
+				.with(new T3T5FlatJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt\n";
+	}
+
+	@Test
+	public void testJoinWithTiny() throws Exception {
+		/*
+		 * Join with Tiny
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.joinWithTiny(ds2)
 						.where(1)
 						.equalTo(1)
 						.with(new T3T5FlatJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo\n" +
-						"Hello,Hallo Welt\n" +
-						"Hello world,Hallo Welt\n";
-				
-			}
-			case 2: {
-				
-				/*
-				 * UDF Join on tuples with multiple key field positions
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = 
-						ds1.join(ds2)
-						   .where(0,1)
-						   .equalTo(0,4)
-						   .with(new T3T5FlatJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo\n" +
-						"Hello,Hallo Welt\n" +
-						"Hello world,Hallo Welt wie gehts?\n" +
-						"Hello world,ABC\n" +
-						"I am fine.,HIJ\n" +
-						"I am fine.,IJK\n";
-				
-			}
-			case 3: {
-				
-				/*
-				 * Default Join on tuples
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<Tuple3<Integer, Long, String>,Tuple5<Integer, Long, Integer, String, Long>>> joinDs = 
-						ds1.join(ds2)
-						   .where(0)
-						   .equalTo(2);
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
-						"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
-						"(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
-			
-			}
-			case 4: {
-				
-				/*
-				 * Join with Huge
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
-															.where(1)
-															.equalTo(1)
-															.with(new T3T5FlatJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo\n" +
-						"Hello,Hallo Welt\n" +
-						"Hello world,Hallo Welt\n";
-				
-			}
-			case 5: {
-				
-				/*
-				 * Join with Tiny
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = 
-						ds1.joinWithTiny(ds2)
-						   .where(1)
-						   .equalTo(1)
-						   .with(new T3T5FlatJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo\n" +
-						"Hello,Hallo Welt\n" +
-						"Hello world,Hallo Welt\n";
-				
-			}
-			
-			case 6: {
-				
-				/*
-				 * Join that returns the left input object
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> joinDs = 
-						ds1.join(ds2)
-						   .where(1)
-						   .equalTo(1)
-						   .with(new LeftReturningJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n";
-			}
-			case 7: {
-				
-				/*
-				 * Join that returns the right input object
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> joinDs = 
-						ds1.join(ds2)
-						   .where(1)
-						   .equalTo(1)
-						   .with(new RightReturningJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,Hallo,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"2,2,1,Hallo Welt,2\n";
-			}
-			case 8: {
-				
-				/*
-				 * Join with broadcast set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple3<String, String, Integer>> joinDs = 
-						ds1.join(ds2)
-						   .where(1)
-						   .equalTo(4)
-						   .with(new T3T5BCJoin())
-						   .withBroadcastSet(intDs, "ints");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo,55\n" +
-						"Hi,Hallo Welt wie,55\n" +
-						"Hello,Hallo Welt,55\n" +
-						"Hello world,Hallo Welt,55\n";
-			}
-			case 9: {
-			
-				/*
-				 * Join on a tuple input with key field selector and a custom type input with key extractor
-				 */
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs =
-						ds1.join(ds2)
-						   .where(new KeySelector<CustomType, Integer>() {
-									  @Override
-									  public Integer getKey(CustomType value) {
-										  return value.myInt;
-									  }
-								  }
-						   )
-						   .equalTo(0)
-						   .with(new CustT3Join());
-
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "Hi,Hi\n" +
-						"Hello,Hello\n" +
-						"Hello world,Hello\n";
-
-				}
-			case 10: {
-				
-				/*
-				 * Project join on a tuple input 1
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple6<String, Long, String, Integer, Long, Long>> joinDs = 
-						ds1.join(ds2)
-						   .where(1)
-						   .equalTo(1)
-						   .projectFirst(2,1)
-						   .projectSecond(3)
-						   .projectFirst(0)
-						   .projectSecond(4,1);
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,1,Hallo,1,1,1\n" +
-						"Hello,2,Hallo Welt,2,2,2\n" +
-						"Hello world,2,Hallo Welt,3,2,2\n";
-				
-			}
-			case 11: {
-				
-				/*
-				 * Project join on a tuple input 2
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple6<String, String, Long, Long, Long, Integer>> joinDs = 
-						ds1.join(ds2)
-						   .where(1)
-						   .equalTo(1)
-						   .projectSecond(3)
-						   .projectFirst(2,1)
-						   .projectSecond(4,1)
-						   .projectFirst(0);
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hallo,Hi,1,1,1,1\n" +
-						"Hallo Welt,Hello,2,2,2,2\n" +
-						"Hallo Welt,Hello world,2,2,2,3\n";
-			}
-				
-			case 12: {
-				
-				/*
-				 * Join on a tuple input with key field selector and a custom type input with key extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = 
-						ds1.join(ds2)
-						   .where(1).equalTo(new KeySelector<CustomType, Long>() {
-									   @Override
-									   public Long getKey(CustomType value) {
-										   return value.myLong;
-									   }
-								   })
-						   .with(new T3CustJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hello\n" +
-						"Hello,Hello world\n" +
-						"Hello world,Hello world\n";
-						
-			}
-			
-			case 13: {
-				
-				/*
-				 * (Default) Join on two custom type inputs with key extractors
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-				
-				DataSet<Tuple2<CustomType, CustomType>> joinDs = 
-					ds1.join(ds2)
-					   .where(
-							   new KeySelector<CustomType, Integer>() {
-								   @Override
-								   public Integer getKey(CustomType value) {
-									   return value.myInt;
-								   }
-							   }
-							  )
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt\n";
+	}
+
+	@Test
+	public void testJoinThatReturnsTheLeftInputObject() throws Exception {
+		/*
+		 * Join that returns the left input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(1)
+						.with(new LeftReturningJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n";
+	}
+
+	@Test
+	public void testJoinThatReturnsTheRightInputObject() throws Exception {
+		/*
+		 * Join that returns the right input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(1)
+						.with(new RightReturningJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,2,1,Hallo Welt,2\n";
+	}
+
+	@Test
+	public void testJoinWithBroadcastSet() throws Exception {
+		/*
+		 * Join with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple3<String, String, Integer>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(4)
+						.with(new T3T5BCJoin())
+						.withBroadcastSet(intDs, "ints");
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo,55\n" +
+				"Hi,Hallo Welt wie,55\n" +
+				"Hello,Hallo Welt,55\n" +
+				"Hello world,Hallo Welt,55\n";
+	}
+
+	@Test
+	public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+			throws Exception{
+		/*
+		 * Join on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.join(ds2)
+						.where(new KeySelector1())
+						.equalTo(0)
+						.with(new CustT3Join());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hi\n" +
+				"Hello,Hello\n" +
+				"Hello world,Hello\n";
+
+	}
+
+	public static class KeySelector1 implements KeySelector<CustomType, Integer> {
+		@Override
+		public Integer getKey(CustomType value) {
+			return value.myInt;
+		}
+	}
+
+	@Test
+	public void testProjectOnATuple1Input() throws Exception {
+		/*
+		 * Project join on a tuple input 1
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple6<String, Long, String, Integer, Long, Long>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(1)
+						.projectFirst(2,1)
+						.projectSecond(3)
+						.projectFirst(0)
+						.projectSecond(4,1);
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,1,Hallo,1,1,1\n" +
+				"Hello,2,Hallo Welt,2,2,2\n" +
+				"Hello world,2,Hallo Welt,3,2,2\n";
+	}
+
+	@Test
+	public void testProjectJoinOnATuple2Input() throws Exception {
+		/*
+		 * Project join on a tuple input 2
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple6<String, String, Long, Long, Long, Integer>> joinDs =
+				ds1.join(ds2)
+						.where(1)
+						.equalTo(1)
+						.projectSecond(3)
+						.projectFirst(2,1)
+						.projectSecond(4,1)
+						.projectFirst(0);
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hallo,Hi,1,1,1,1\n" +
+				"Hallo Welt,Hello,2,2,2,2\n" +
+				"Hallo Welt,Hello world,2,2,2,3\n";
+	}
+
+	@Test
+	public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+			throws Exception {
+		/*
+		 * Join on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.join(ds2)
+						.where(1).equalTo(new KeySelector2())
+						.with(new T3CustJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hello\n" +
+				"Hello,Hello world\n" +
+				"Hello world,Hello world\n";
+	}
+
+	public static class KeySelector2 implements KeySelector<CustomType, Long> {
+		@Override
+		public Long getKey(CustomType value) {
+			return value.myLong;
+		}
+	}
+
+	@Test
+	public void testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+		/*
+		 * (Default) Join on two custom type inputs with key extractors
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+
+		DataSet<Tuple2<CustomType, CustomType>> joinDs =
+				ds1.join(ds2)
+						.where(
+							new KeySelector5()
+						)
 						.equalTo(
-								new KeySelector<CustomType, Integer>() {
-									   @Override
-									   public Integer getKey(CustomType value) {
-										   return value.myInt;
-									   }
-								   }
-								);
-																				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,Hi,1,0,Hi\n" +
-						"2,1,Hello,2,1,Hello\n" +
-						"2,1,Hello,2,2,Hello world\n" +
-						"2,2,Hello world,2,1,Hello\n" +
-						"2,2,Hello world,2,2,Hello world\n";
-	
-			}
-			case 14: {
-				/*
-				 * UDF Join on tuples with tuple-returning key selectors
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<String, String>> joinDs = 
-						ds1.join(ds2)
-						   .where(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
-								private static final long serialVersionUID = 1L;
-								
-								@Override
-								public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
-									return new Tuple2<Integer, Long>(t.f0, t.f1);
-								}
-							})
-						   .equalTo(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
-								private static final long serialVersionUID = 1L;
-								
-								@Override
-								public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-									return new Tuple2<Integer, Long>(t.f0, t.f4);
-								}
-							})
-						   .with(new T3T5FlatJoin());
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "Hi,Hallo\n" +
-						"Hello,Hallo Welt\n" +
-						"Hello world,Hallo Welt wie gehts?\n" +
-						"Hello world,ABC\n" +
-						"I am fine.,HIJ\n" +
-						"I am fine.,IJK\n";
-			}
-			/**
-			 *  Joins with POJOs
-			 */
-			case 15: {
-				/*
-				 * Join nested pojo against tuple (selected using a string)
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = 
-						ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-			}
-			
-			case 16: {
-				/*
-				 * Join nested pojo against tuple (selected as an integer)
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = 
-						ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
-				
-				joinDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-			}
-			case 17: {
-				/*
-				 * selecting multiple fields using expression language
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = 
-						ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-				
-			}
-			case 18: {
-				/*
-				 * nested into tuple
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = 
-						ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-				
-			}
-			case 19: {
-				/*
-				 * nested into tuple into pojo
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = 
-						ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-				
-			}
-			case 20: {
-				/*
-				 * Non-POJO test to verify that full-tuple keys are working.
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-				DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-				DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = 
-						ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "((1,1),one),((1,1),one)\n" +
-					   "((2,2),two),((2,2),two)\n" +
-					   "((3,3),three),((3,3),three)\n";
-				
-			}
-			case 21: {
-				/*
-				 * Non-POJO test to verify "nested" tuple-element selection.
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-				DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-				DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = 
-						ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "((1,1),one),((1,1),one)\n" +
-					   "((2,2),two),((2,2),two)\n" +
-					   "((3,3),three),((3,3),three)\n";
-				
-			}
-			case 22: {
-				/*
-				 * full pojo with full tuple
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
-				DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String> >> joinDs = 
-						ds1.join(ds2).where("*").equalTo("*");
-				
-				joinDs.writeAsCsv(resultPath);
-				env.setDegreeOfParallelism(1);
-				env.execute();
-				
-				// return expected result
-				return "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
-						"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
-						"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
-			}
-			case 23: {
-				/*
-				 * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
-				 */
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-					DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>> ds2 = ds1.join(ds1).where(0).equalTo(0);
-					DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
-						ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
-
-					joinDs.writeAsCsv(resultPath);
-					env.setDegreeOfParallelism(1);
-					env.execute();
-
-					// return expected result
-					return "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
-						"((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
-						"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
-
-				}
-			default: 
-				throw new IllegalArgumentException("Invalid program id: "+progId);
-			}
-			
+								new KeySelector6()
+						);
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,0,Hi,1,0,Hi\n" +
+				"2,1,Hello,2,1,Hello\n" +
+				"2,1,Hello,2,2,Hello world\n" +
+				"2,2,Hello world,2,1,Hello\n" +
+				"2,2,Hello world,2,2,Hello world\n";
+	}
+
+	public static class KeySelector5 implements KeySelector<CustomType, Integer> {
+		@Override
+		public Integer getKey(CustomType value) {
+			return value.myInt;
 		}
-	
 	}
-	
+
+	public static class KeySelector6 implements KeySelector<CustomType, Integer> {
+		@Override
+		public Integer getKey(CustomType value) {
+			return value.myInt;
+		}
+	}
+
+	@Test
+	public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
+		/*
+		 * UDF Join on tuples with tuple-returning key selectors
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<String, String>> joinDs =
+				ds1.join(ds2)
+						.where(new KeySelector3())
+						.equalTo(new KeySelector4())
+						.with(new T3T5FlatJoin());
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt wie gehts?\n" +
+				"Hello world,ABC\n" +
+				"I am fine.,HIJ\n" +
+				"I am fine.,IJK\n";
+	}
+
+	public static class KeySelector3 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f1);
+		}
+	}
+
+	public static class KeySelector4 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f4);
+		}
+	}
+
+	@Test
+	public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
+		/*
+		 * Join nested pojo against tuple (selected using a string)
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+				ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+	}
+
+	@Test
+	public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
+		/*
+		 * Join nested pojo against tuple (selected as an integer)
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+				ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+	}
+
+	@Test
+	public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
+		/*
+		 * selecting multiple fields using expression language
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+				ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+	}
+
+	@Test
+	public void testNestedIntoTuple() throws Exception {
+		/*
+		 * nested into tuple
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+				ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+	}
+
+	@Test
+	public void testNestedIntoTupleIntoPojo() throws Exception {
+		/*
+		 * nested into tuple into pojo
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+				ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+	}
+
+	@Test
+	public void testNonPojoToVerifyFullTupleKeys() throws Exception {
+		/*
+		 * Non-POJO test to verify that full-tuple keys are working.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
+				ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "((1,1),one),((1,1),one)\n" +
+				"((2,2),two),((2,2),two)\n" +
+				"((3,3),three),((3,3),three)\n";
+
+	}
+
+	@Test
+	public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception {
+		/*
+		 * Non-POJO test to verify "nested" tuple-element selection.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
+				ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "((1,1),one),((1,1),one)\n" +
+				"((2,2),two),((2,2),two)\n" +
+				"((3,3),three),((3,3),three)\n";
+	}
+
+	@Test
+	public void testFullPojoWithFullTuple() throws Exception {
+		/*
+		 * full pojo with full tuple
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
+		DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String> >> joinDs =
+				ds1.join(ds2).where("*").equalTo("*");
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
+				"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
+				"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
+	}
+
+	@Test
+	public void testNonPojoToVerifyNestedTupleElementSelectionWithFirstKeyFieldGreaterThanZero()
+	throws Exception {
+		/*
+		 * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>> ds2 = ds1.join(ds1).where(0).equalTo(0);
+		DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
+				ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
+
+		joinDs.writeAsCsv(resultPath);
+		env.setDegreeOfParallelism(1);
+		env.execute();
+
+		expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
+				"((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
+				"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
+	}
+
 	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index f26a1e7..4fbb53d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
 import org.junit.Assert;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -31,461 +30,461 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class MapITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 9;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class MapITCase extends MultipleProgramsTestBase {
+
+	public MapITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public MapITCase(Configuration config) {
-		super(config);	
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = MapProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testIdentityMapWithBasicType() throws Exception {
+		/*
+		 * Test identity map with basic type
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> identityMapDs = ds.
+				map(new Mapper1());
+
+		identityMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "Hi\n" +
+				"Hello\n" +
+				"Hello world\n" +
+				"Hello world, how are you?\n" +
+				"I am fine.\n" +
+				"Luke Skywalker\n" +
+				"Random comment\n" +
+				"LOL\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	public static class Mapper1 implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testIdentityMapWithTuple() throws Exception {
+		/*
+		 * Test identity map with a tuple
+		 */
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
+				map(new Mapper2());
+
+		identityMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n" +
+				"6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" +
+				"8,4,Comment#2\n" +
+				"9,4,Comment#3\n" +
+				"10,4,Comment#4\n" +
+				"11,5,Comment#5\n" +
+				"12,5,Comment#6\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" +
+				"15,5,Comment#9\n" +
+				"16,6,Comment#10\n" +
+				"17,6,Comment#11\n" +
+				"18,6,Comment#12\n" +
+				"19,6,Comment#13\n" +
+				"20,6,Comment#14\n" +
+				"21,6,Comment#15\n";
+	}
+
+	public static class Mapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+		throws Exception {
+			return value;
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class MapProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test identity map with basic type
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-				DataSet<String> identityMapDs = ds.
-						map(new MapFunction<String, String>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public String map(String value) throws Exception {
-								return value;
-							}
-						});
-				
-				identityMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"Hi\n" +
-						"Hello\n" +
-						"Hello world\n" +
-						"Hello world, how are you?\n" +
-						"I am fine.\n" +
-						"Luke Skywalker\n" +
-						"Random comment\n" +
-						"LOL\n";
-			}
-			case 2: {
-				/*
-				 * Test identity map with a tuple
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
-						map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) 
-									throws Exception {
-								return value;
-							}
-						});
-				
-				identityMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return  "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n" +
-						"4,3,Hello world, how are you?\n" +
-						"5,3,I am fine.\n" +
-						"6,3,Luke Skywalker\n" +
-						"7,4,Comment#1\n" +
-						"8,4,Comment#2\n" +
-						"9,4,Comment#3\n" +
-						"10,4,Comment#4\n" +
-						"11,5,Comment#5\n" +
-						"12,5,Comment#6\n" +
-						"13,5,Comment#7\n" +
-						"14,5,Comment#8\n" +
-						"15,5,Comment#9\n" +
-						"16,6,Comment#10\n" +
-						"17,6,Comment#11\n" +
-						"18,6,Comment#12\n" +
-						"19,6,Comment#13\n" +
-						"20,6,Comment#14\n" +
-						"21,6,Comment#15\n";
-			}
-			case 3: {
-				/*
-				 * Test type conversion mapper (Custom -> Tuple)
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
-						map(new MapFunction<CustomType, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-							private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-							
-							@Override
-							public Tuple3<Integer, Long, String> map(CustomType value) throws Exception {
-								out.setField(value.myInt, 0);
-								out.setField(value.myLong, 1);
-								out.setField(value.myString, 2);
-								return out;
-							}
-						});
-				
-				typeConversionMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1,0,Hi\n" +
-						"2,1,Hello\n" +
-						"2,2,Hello world\n" +
-						"3,3,Hello world, how are you?\n" +
-						"3,4,I am fine.\n" +
-						"3,5,Luke Skywalker\n" +
-						"4,6,Comment#1\n" +
-						"4,7,Comment#2\n" +
-						"4,8,Comment#3\n" +
-						"4,9,Comment#4\n" +
-						"5,10,Comment#5\n" +
-						"5,11,Comment#6\n" +
-						"5,12,Comment#7\n" +
-						"5,13,Comment#8\n" +
-						"5,14,Comment#9\n" +
-						"6,15,Comment#10\n" +
-						"6,16,Comment#11\n" +
-						"6,17,Comment#12\n" +
-						"6,18,Comment#13\n" +
-						"6,19,Comment#14\n" +
-						"6,20,Comment#15\n";
-			}
-			case 4: {
-				/*
-				 * Test type conversion mapper (Tuple -> Basic)
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<String> typeConversionMapDs = ds.
-						map(new MapFunction<Tuple3<Integer, Long, String>, String>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public String map(Tuple3<Integer, Long, String> value) throws Exception {
-								return value.getField(2);
-							}
-						});
-				
-				typeConversionMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"Hi\n" + "Hello\n" + "Hello world\n" +
-						"Hello world, how are you?\n" +
-						"I am fine.\n" + "Luke Skywalker\n" +
-						"Comment#1\n" +	"Comment#2\n" +
-						"Comment#3\n" +	"Comment#4\n" +
-						"Comment#5\n" +	"Comment#6\n" +
-						"Comment#7\n" + "Comment#8\n" +
-						"Comment#9\n" +	"Comment#10\n" +
-						"Comment#11\n" + "Comment#12\n" +
-						"Comment#13\n" + "Comment#14\n" +
-						"Comment#15\n";
-			}
-			case 5: {
-				/*
-				 * Test mapper on tuple - Increment Integer field, reorder second and third fields
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
-						map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>>() {
-							private static final long serialVersionUID = 1L;
-							private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>();
-							
-							@Override
-							public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value) 
-									throws Exception {
-								Integer incr = Integer.valueOf(value.f0.intValue() + 1);
-								out.setFields(incr, value.f2, value.f1);
-								return out;
-							}
-						});
-				
-				tupleMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"2,Hi,1\n" +
-						"3,Hello,2\n" +
-						"4,Hello world,2\n" +
-						"5,Hello world, how are you?,3\n" +
-						"6,I am fine.,3\n" +
-						"7,Luke Skywalker,3\n" +
-						"8,Comment#1,4\n" +
-						"9,Comment#2,4\n" +
-						"10,Comment#3,4\n" +
-						"11,Comment#4,4\n" +
-						"12,Comment#5,5\n" +
-						"13,Comment#6,5\n" +
-						"14,Comment#7,5\n" +
-						"15,Comment#8,5\n" +
-						"16,Comment#9,5\n" +
-						"17,Comment#10,6\n" +
-						"18,Comment#11,6\n" +
-						"19,Comment#12,6\n" +
-						"20,Comment#13,6\n" +
-						"21,Comment#14,6\n" +
-						"22,Comment#15,6\n";
-			}
-			case 6: {
-				/*
-				 * Test mapper on Custom - lowercase myString
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> customMapDs = ds.
-						map(new MapFunction<CustomType, CustomType>() {
-							private static final long serialVersionUID = 1L;
-							private final CustomType out = new CustomType(); 
-							
-							@Override
-							public CustomType map(CustomType value) throws Exception {
-								out.myInt = value.myInt;
-								out.myLong = value.myLong;
-								out.myString = value.myString.toLowerCase();
-								return out;
-							}
-						});
-				
-				customMapDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1,0,hi\n" +
-						"2,1,hello\n" +
-						"2,2,hello world\n" +
-						"3,3,hello world, how are you?\n" +
-						"3,4,i am fine.\n" +
-						"3,5,luke skywalker\n" +
-						"4,6,comment#1\n" +
-						"4,7,comment#2\n" +
-						"4,8,comment#3\n" +
-						"4,9,comment#4\n" +
-						"5,10,comment#5\n" +
-						"5,11,comment#6\n" +
-						"5,12,comment#7\n" +
-						"5,13,comment#8\n" +
-						"5,14,comment#9\n" +
-						"6,15,comment#10\n" +
-						"6,16,comment#11\n" +
-						"6,17,comment#12\n" +
-						"6,18,comment#13\n" +
-						"6,19,comment#14\n" +
-						"6,20,comment#15\n";
-			}
-			case 7: {
-				/*
-				 * Test mapper if UDF returns input object - increment first field of a tuple
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
-						map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) 
-									throws Exception {
-								Integer incr = Integer.valueOf(value.f0.intValue() + 1);
-								value.setField(incr, 0);
-								return value;
-							}
-						});
-				
-				inputObjMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"2,1,Hi\n" +
-						"3,2,Hello\n" +
-						"4,2,Hello world\n" +
-						"5,3,Hello world, how are you?\n" +
-						"6,3,I am fine.\n" +
-						"7,3,Luke Skywalker\n" +
-						"8,4,Comment#1\n" +
-						"9,4,Comment#2\n" +
-						"10,4,Comment#3\n" +
-						"11,4,Comment#4\n" +
-						"12,5,Comment#5\n" +
-						"13,5,Comment#6\n" +
-						"14,5,Comment#7\n" +
-						"15,5,Comment#8\n" +
-						"16,5,Comment#9\n" +
-						"17,6,Comment#10\n" +
-						"18,6,Comment#11\n" +
-						"19,6,Comment#12\n" +
-						"20,6,Comment#13\n" +
-						"21,6,Comment#14\n" +
-						"22,6,Comment#15\n";
-			}
-			case 8: {
-				/*
-				 * Test map with broadcast set 
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-						map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-							private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-							private Integer f2Replace = 0;
-							
-							@Override
-							public void open(Configuration config) {
-								Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-								int sum = 0;
-								for(Integer i : ints) {
-									sum += i;
-								}
-								f2Replace = sum;
-							}
-							
-							@Override
-							public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) 
-									throws Exception {
-								out.setFields(f2Replace, value.f1, value.f2);
-								return out;
-							}
-						}).withBroadcastSet(ints, "ints");
-				bcMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"55,1,Hi\n" +
-						"55,2,Hello\n" +
-						"55,2,Hello world\n" +
-						"55,3,Hello world, how are you?\n" +
-						"55,3,I am fine.\n" +
-						"55,3,Luke Skywalker\n" +
-						"55,4,Comment#1\n" +
-						"55,4,Comment#2\n" +
-						"55,4,Comment#3\n" +
-						"55,4,Comment#4\n" +
-						"55,5,Comment#5\n" +
-						"55,5,Comment#6\n" +
-						"55,5,Comment#7\n" +
-						"55,5,Comment#8\n" +
-						"55,5,Comment#9\n" +
-						"55,6,Comment#10\n" +
-						"55,6,Comment#11\n" +
-						"55,6,Comment#12\n" +
-						"55,6,Comment#13\n" +
-						"55,6,Comment#14\n" +
-						"55,6,Comment#15\n";
-			}
-			case 9: {
-				/*
-				 * Test passing configuration object.
-				 */
-					
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				Configuration conf = new Configuration();
-				final String testKey = "testVariable";
-				final int testValue = 666;
-				conf.setInteger(testKey, testValue);
-				DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-						map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public void open(Configuration config) {
-								int val = config.getInteger(testKey, -1);
-								Assert.assertEquals(testValue, val);
-							}
-							
-							@Override
-							public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) {
-								return value;
-							}
-						}).withParameters(conf);
-				bcMapDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1,1,Hi\n"
-						+ "2,2,Hello\n"
-						+ "3,2,Hello world";
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+
+	@Test
+	public void testTypeConversionMapperCustomToTuple() throws Exception {
+		/*
+		 * Test type conversion mapper (Custom -> Tuple)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
+				map(new Mapper3());
+
+		typeConversionMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,0,Hi\n" +
+				"2,1,Hello\n" +
+				"2,2,Hello world\n" +
+				"3,3,Hello world, how are you?\n" +
+				"3,4,I am fine.\n" +
+				"3,5,Luke Skywalker\n" +
+				"4,6,Comment#1\n" +
+				"4,7,Comment#2\n" +
+				"4,8,Comment#3\n" +
+				"4,9,Comment#4\n" +
+				"5,10,Comment#5\n" +
+				"5,11,Comment#6\n" +
+				"5,12,Comment#7\n" +
+				"5,13,Comment#8\n" +
+				"5,14,Comment#9\n" +
+				"6,15,Comment#10\n" +
+				"6,16,Comment#11\n" +
+				"6,17,Comment#12\n" +
+				"6,18,Comment#13\n" +
+				"6,19,Comment#14\n" +
+				"6,20,Comment#15\n";
+	}
+
+	public static class Mapper3 implements MapFunction<CustomType, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
+
+		@Override
+		public Tuple3<Integer, Long, String> map(CustomType value) throws Exception {
+			out.setField(value.myInt, 0);
+			out.setField(value.myLong, 1);
+			out.setField(value.myString, 2);
+			return out;
+		}
+	}
+
+	@Test
+	public void testTypeConversionMapperTupleToBasic() throws Exception {
+		/*
+		 * Test type conversion mapper (Tuple -> Basic)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<String> typeConversionMapDs = ds.
+				map(new Mapper4());
+
+		typeConversionMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+				"Hello world, how are you?\n" +
+				"I am fine.\n" + "Luke Skywalker\n" +
+				"Comment#1\n" +	"Comment#2\n" +
+				"Comment#3\n" +	"Comment#4\n" +
+				"Comment#5\n" +	"Comment#6\n" +
+				"Comment#7\n" + "Comment#8\n" +
+				"Comment#9\n" +	"Comment#10\n" +
+				"Comment#11\n" + "Comment#12\n" +
+				"Comment#13\n" + "Comment#14\n" +
+				"Comment#15\n";
+	}
+
+	public static class Mapper4 implements MapFunction<Tuple3<Integer, Long, String>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(Tuple3<Integer, Long, String> value) throws Exception {
+			return value.getField(2);
+		}
+	}
+
+	@Test
+	public void testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws
+			Exception {
+		/*
+		 * Test mapper on tuple - Increment Integer field, reorder second and third fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
+				map(new Mapper5());
+
+		tupleMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "2,Hi,1\n" +
+				"3,Hello,2\n" +
+				"4,Hello world,2\n" +
+				"5,Hello world, how are you?,3\n" +
+				"6,I am fine.,3\n" +
+				"7,Luke Skywalker,3\n" +
+				"8,Comment#1,4\n" +
+				"9,Comment#2,4\n" +
+				"10,Comment#3,4\n" +
+				"11,Comment#4,4\n" +
+				"12,Comment#5,5\n" +
+				"13,Comment#6,5\n" +
+				"14,Comment#7,5\n" +
+				"15,Comment#8,5\n" +
+				"16,Comment#9,5\n" +
+				"17,Comment#10,6\n" +
+				"18,Comment#11,6\n" +
+				"19,Comment#12,6\n" +
+				"20,Comment#13,6\n" +
+				"21,Comment#14,6\n" +
+				"22,Comment#15,6\n";
+	}
+
+	public static class Mapper5 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>();
+
+		@Override
+		public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value)
+		throws Exception {
+			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+			out.setFields(incr, value.f2, value.f1);
+			return out;
+		}
+	}
+
+	@Test
+	public void testMapperOnCustomLowercaseString() throws Exception {
+		/*
+		 * Test mapper on Custom - lowercase myString
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> customMapDs = ds.
+				map(new Mapper6());
+
+		customMapDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1,0,hi\n" +
+				"2,1,hello\n" +
+				"2,2,hello world\n" +
+				"3,3,hello world, how are you?\n" +
+				"3,4,i am fine.\n" +
+				"3,5,luke skywalker\n" +
+				"4,6,comment#1\n" +
+				"4,7,comment#2\n" +
+				"4,8,comment#3\n" +
+				"4,9,comment#4\n" +
+				"5,10,comment#5\n" +
+				"5,11,comment#6\n" +
+				"5,12,comment#7\n" +
+				"5,13,comment#8\n" +
+				"5,14,comment#9\n" +
+				"6,15,comment#10\n" +
+				"6,16,comment#11\n" +
+				"6,17,comment#12\n" +
+				"6,18,comment#13\n" +
+				"6,19,comment#14\n" +
+				"6,20,comment#15\n";
+	}
+
+	public static class Mapper6 implements MapFunction<CustomType, CustomType> {
+		private static final long serialVersionUID = 1L;
+		private final CustomType out = new CustomType();
+
+		@Override
+		public CustomType map(CustomType value) throws Exception {
+			out.myInt = value.myInt;
+			out.myLong = value.myLong;
+			out.myString = value.myString.toLowerCase();
+			return out;
+		}
+	}
+
+	@Test
+	public void test() throws Exception {
+		/*
+		 * Test mapper if UDF returns input object - increment first field of a tuple
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
+				map(new Mapper7());
+
+		inputObjMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "2,1,Hi\n" +
+				"3,2,Hello\n" +
+				"4,2,Hello world\n" +
+				"5,3,Hello world, how are you?\n" +
+				"6,3,I am fine.\n" +
+				"7,3,Luke Skywalker\n" +
+				"8,4,Comment#1\n" +
+				"9,4,Comment#2\n" +
+				"10,4,Comment#3\n" +
+				"11,4,Comment#4\n" +
+				"12,5,Comment#5\n" +
+				"13,5,Comment#6\n" +
+				"14,5,Comment#7\n" +
+				"15,5,Comment#8\n" +
+				"16,5,Comment#9\n" +
+				"17,6,Comment#10\n" +
+				"18,6,Comment#11\n" +
+				"19,6,Comment#12\n" +
+				"20,6,Comment#13\n" +
+				"21,6,Comment#14\n" +
+				"22,6,Comment#15\n";
+	}
+
+	public static class Mapper7 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+		throws Exception {
+			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+			value.setField(incr, 0);
+			return value;
+		}
+	}
+
+	@Test
+	public void testMapWithBroadcastSet() throws Exception {
+		/*
+		 * Test map with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+				map(new RichMapper1()).withBroadcastSet(ints, "ints");
+		bcMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "55,1,Hi\n" +
+				"55,2,Hello\n" +
+				"55,2,Hello world\n" +
+				"55,3,Hello world, how are you?\n" +
+				"55,3,I am fine.\n" +
+				"55,3,Luke Skywalker\n" +
+				"55,4,Comment#1\n" +
+				"55,4,Comment#2\n" +
+				"55,4,Comment#3\n" +
+				"55,4,Comment#4\n" +
+				"55,5,Comment#5\n" +
+				"55,5,Comment#6\n" +
+				"55,5,Comment#7\n" +
+				"55,5,Comment#8\n" +
+				"55,5,Comment#9\n" +
+				"55,6,Comment#10\n" +
+				"55,6,Comment#11\n" +
+				"55,6,Comment#12\n" +
+				"55,6,Comment#13\n" +
+				"55,6,Comment#14\n" +
+				"55,6,Comment#15\n";
+	}
+
+	public static class RichMapper1 extends RichMapFunction<Tuple3<Integer,Long,String>,
+	Tuple3<Integer,	Long,String>> {
+		private static final long serialVersionUID = 1L;
+		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
+		private Integer f2Replace = 0;
+
+		@Override
+		public void open(Configuration config) {
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			int sum = 0;
+			for(Integer i : ints) {
+				sum += i;
 			}
-			
+			f2Replace = sum;
+		}
+
+		@Override
+		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+		throws Exception {
+			out.setFields(f2Replace, value.f1, value.f2);
+			return out;
+		}
+	}
+
+	static final String testKey = "testVariable";
+	static final int testValue = 666;
+
+	@Test
+	public void testPassingConfigurationObject() throws Exception {
+		/*
+		 * Test passing configuration object.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		Configuration conf = new Configuration();
+		conf.setInteger(testKey, testValue);
+		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+				map(new RichMapper2()).withParameters(conf);
+		bcMapDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n"
+				+ "2,2,Hello\n"
+				+ "3,2,Hello world";
+	}
+
+	public static class RichMapper2 extends RichMapFunction<Tuple3<Integer,Long,String>,
+	Tuple3<Integer,	Long,String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void open(Configuration config) {
+			int val = config.getInteger(testKey, -1);
+			Assert.assertEquals(testValue, val);
+		}
+
+		@Override
+		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) {
+			return value;
 		}
-	
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index d3c87fa..cf78a34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -34,227 +30,212 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class PartitionITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 4;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class PartitionITCase extends MultipleProgramsTestBase {
+
+	public PartitionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public PartitionITCase(Configuration config) {
-		super(config);	
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = PartitionProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testHashPartitionByKeyField() throws Exception {
+		/*
+		 * Test hash partition by key field
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Long> uniqLongs = ds
+				.partitionByHash(1)
+				.mapPartition(new UniqueLongMapper());
+		uniqLongs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+				"2\n" +
+				"3\n" +
+				"4\n" +
+				"5\n" +
+				"6\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testHashPartitionByKeySelector() throws Exception {
+		/*
+		 * Test hash partition by key selector
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Long> uniqLongs = ds
+				.partitionByHash(new KeySelector1())
+				.mapPartition(new UniqueLongMapper());
+		uniqLongs.writeAsText(resultPath);
+		env.execute();
+
+		expected = 	"1\n" +
+				"2\n" +
+				"3\n" +
+				"4\n" +
+				"5\n" +
+				"6\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
+		private static final long serialVersionUID = 1L;
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+		@Override
+		public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
+			return value.f1;
 		}
-		
-		return toParameterList(tConfigs);
+
 	}
-	
-	private static class PartitionProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 0: {
-				/*
-				 * Test hash partition by key field
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Long> uniqLongs = ds
-						.partitionByHash(1)
-						.mapPartition(new UniqueLongMapper());
-				uniqLongs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1\n" +
-						"2\n" +
-						"3\n" +
-						"4\n" +
-						"5\n" +
-						"6\n";
-			}
-			case 1: {
-				/*
-				 * Test hash partition by key selector
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Long> uniqLongs = ds
-						.partitionByHash(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
-								return value.f1;
-							}
-							
-						})
-						.mapPartition(new UniqueLongMapper());
-				uniqLongs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"1\n" +
-						"2\n" +
-						"3\n" +
-						"4\n" +
-						"5\n" +
-						"6\n";
-			}
-			case 2: {
-				/*
-				 * Test forced rebalancing
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				// generate some number in parallel
-				DataSet<Long> ds = env.generateSequence(1,3000);
-				DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
-						// introduce some partition skew by filtering
-						.filter(new FilterFunction<Long>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public boolean filter(Long value) throws Exception {
-								if (value <= 780) {
-									return false;
-								} else {
-									return true;
-								}
-							}
-						})
+
+	@Test
+	public void testForcedRebalancing() throws Exception {
+		/*
+		 * Test forced rebalancing
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// generate some number in parallel
+		DataSet<Long> ds = env.generateSequence(1,3000);
+		DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
+				// introduce some partition skew by filtering
+				.filter(new Filter1())
 						// rebalance
-						.rebalance()
+				.rebalance()
 						// count values in each partition
-						.map(new PartitionIndexMapper())
-						.groupBy(0)
-						.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
-							private static final long serialVersionUID = 1L;
-
-							public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
-								return new Tuple2<Integer, Integer>(v1.f0, v1.f1+v2.f1);
-							}
-						})
+				.map(new PartitionIndexMapper())
+				.groupBy(0)
+				.reduce(new Reducer1())
 						// round counts to mitigate runtime scheduling effects (lazy split assignment)
-						.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
-								value.f1 = (value.f1 / 10);
-								return value;
-							}
-							
-						});
-				
-				uniqLongs.writeAsText(resultPath);
-				
-				env.execute();
-				
-				StringBuilder result = new StringBuilder();
-				int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
-				for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
-					result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
-				}
-				// return expected result
-				return result.toString();
-			}
-			case 3: {
-				/*
-				 * Test hash partition by key field and different DOP
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(3);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Long> uniqLongs = ds
-						.partitionByHash(1).setParallelism(4)
-						.mapPartition(new UniqueLongMapper());
-				uniqLongs.writeAsText(resultPath);
-				
-				env.execute();
-				
-				// return expected result
-				return 	"1\n" +
-						"2\n" +
-						"3\n" +
-						"4\n" +
-						"5\n" +
-						"6\n";
-			}
-			case 4: {
-				/*
-				 * Test hash partition with key expression
-				 */
-		
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(3);
-				
-				DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-				DataSet<Long> uniqLongs = ds
-						.partitionByHash("nestedPojo.longNumber").setParallelism(4)
-						.mapPartition(new UniqueNestedPojoLongMapper());
-				uniqLongs.writeAsText(resultPath);
-				
-				env.execute();
-				
-				// return expected result
-				return 	"10000\n" +
-						"20000\n" +
-						"30000\n";
-			}
-			
-			
-			
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+				.map(new Mapper1());
+
+		uniqLongs.writeAsText(resultPath);
+
+		env.execute();
+
+		StringBuilder result = new StringBuilder();
+		int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
+		for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
+			result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
+		}
+
+		expected = result.toString();
+	}
+
+	public static class Filter1 implements FilterFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Long value) throws Exception {
+			if (value <= 780) {
+				return false;
+			} else {
+				return true;
 			}
 		}
 	}
-	
+
+	public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
+			return new Tuple2<Integer, Integer>(v1.f0, v1.f1+v2.f1);
+		}
+	}
+
+	public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
+			Integer>>{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+			value.f1 = (value.f1 / 10);
+			return value;
+		}
+
+	}
+
+	@Test
+	public void testHashPartitionByKeyFieldAndDifferentDOP() throws Exception {
+		/*
+		 * Test hash partition by key field and different DOP
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(3);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Long> uniqLongs = ds
+				.partitionByHash(1).setParallelism(4)
+				.mapPartition(new UniqueLongMapper());
+		uniqLongs.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = 	"1\n" +
+				"2\n" +
+				"3\n" +
+				"4\n" +
+				"5\n" +
+				"6\n";
+	}
+
+	@Test
+	public void testHashPartitionWithKeyExpression() throws Exception {
+		/*
+		 * Test hash partition with key expression
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(3);
+
+		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+		DataSet<Long> uniqLongs = ds
+				.partitionByHash("nestedPojo.longNumber").setParallelism(4)
+				.mapPartition(new UniqueNestedPojoLongMapper());
+		uniqLongs.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = 	"10000\n" +
+				"20000\n" +
+				"30000\n";
+	}
+
 	public static class UniqueLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> {
 		private static final long serialVersionUID = 1L;