You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/01 17:56:05 UTC

[4/6] flink git commit: [FLINK-2275] [tests] Aigrated test from execute() to collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 0232464..4259b63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,12 +38,8 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -54,22 +51,6 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
 		/*
@@ -82,15 +63,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).reduce(new Tuple3Reduce("B-)"));
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"5,2,B-)\n" +
 				"15,3,B-)\n" +
 				"34,4,B-)\n" +
 				"65,5,B-)\n" +
 				"111,6,B-)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -105,10 +87,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
 				groupBy(4,0).reduce(new Tuple5Reduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+				.collect();
 
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n" +
 				"2,3,2,Hallo Welt wie,1\n" +
 				"2,2,1,Hallo Welt,2\n" +
 				"3,9,0,P-),2\n" +
@@ -118,6 +100,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 				"5,11,10,GHI,1\n" +
 				"5,29,0,P-),2\n" +
 				"5,25,0,P-),3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -132,15 +116,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)"));
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"5,2,B-)\n" +
 				"15,3,B-)\n" +
 				"34,4,B-)\n" +
 				"65,5,B-)\n" +
 				"111,6,B-)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
@@ -163,15 +148,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> reduceDs = ds.
 				groupBy(new KeySelector2()).reduce(new CustomTypeReduce());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
 
-		expected = "1,0,Hi\n" +
+		String expected = "1,0,Hi\n" +
 				"2,3,Hello!\n" +
 				"3,12,Hello!\n" +
 				"4,30,Hello!\n" +
 				"5,60,Hello!\n" +
 				"6,105,Hello!\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
@@ -194,10 +180,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				reduce(new AllAddingTuple3Reduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "231,91,Hello World\n";
+		String expected = "231,91,Hello World\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -212,10 +199,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> reduceDs = ds.
 				reduce(new AllAddingCustomTypeReduce());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
+
+		String expected = "91,210,Hello!";
 
-		expected = "91,210,Hello!";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -232,15 +220,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"5,2,55\n" +
 				"15,3,55\n" +
 				"34,4,55\n" +
 				"65,5,55\n" +
 				"111,6,55\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -255,10 +244,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs = ds .
 				groupBy(new KeySelector3()).reduce(new Tuple5Reduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+				.collect();
 
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n" +
 				"2,3,2,Hallo Welt wie,1\n" +
 				"2,2,1,Hallo Welt,2\n" +
 				"3,9,0,P-),2\n" +
@@ -268,6 +257,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 				"5,11,10,GHI,1\n" +
 				"5,29,0,P-),2\n" +
 				"5,25,0,P-),3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
@@ -291,10 +282,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
 				groupBy("f4","f0").reduce(new Tuple5Reduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+				.collect();
 
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n" +
 				"2,3,2,Hallo Welt wie,1\n" +
 				"2,2,1,Hallo Welt,2\n" +
 				"3,9,0,P-),2\n" +
@@ -304,6 +295,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 				"5,11,10,GHI,1\n" +
 				"5,29,0,P-),2\n" +
 				"5,25,0,P-),3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -317,9 +310,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 
 		DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1());
 
-		res.writeAsText(resultPath);
-		env.execute();
-		expected = "ok\nok";
+		List<String> result = res.collect();
+
+		String expected = "ok\nok";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> {
@@ -369,20 +364,20 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 			out.collect("ok");
 		}
 	}
-	
+
 	public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 		private final String f2Replace;
-		
-		public Tuple3Reduce() { 
+
+		public Tuple3Reduce() {
 			this.f2Replace = null;
 		}
-		
-		public Tuple3Reduce(String f2Replace) { 
+
+		public Tuple3Reduce(String f2Replace) {
 			this.f2Replace = f2Replace;
 		}
-		
+
 
 		@Override
 		public Tuple3<Integer, Long, String> reduce(
@@ -397,41 +392,41 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 			return out;
 		}
 	}
-	
+
 	public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-		
+
 		@Override
 		public Tuple5<Integer, Long, Integer, String, Long> reduce(
 				Tuple5<Integer, Long, Integer, String, Long> in1,
 				Tuple5<Integer, Long, Integer, String, Long> in2)
-				throws Exception {
-			
+						throws Exception {
+
 			out.setFields(in1.f0, in1.f1+in2.f1, 0, "P-)", in1.f4);
 			return out;
 		}
 	}
-	
+
 	public static class CustomTypeReduce implements ReduceFunction<CustomType> {
 		private static final long serialVersionUID = 1L;
 		private final CustomType out = new CustomType();
-		
+
 		@Override
 		public CustomType reduce(CustomType in1, CustomType in2)
 				throws Exception {
-			
+
 			out.myInt = in1.myInt;
 			out.myLong = in1.myLong + in2.myLong;
 			out.myString = "Hello!";
 			return out;
 		}
 	}
-	
+
 	public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-		
+
 		@Override
 		public Tuple3<Integer, Long, String> reduce(
 				Tuple3<Integer, Long, String> in1,
@@ -441,37 +436,37 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 			return out;
 		}
 	}
-	
+
 	public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> {
 		private static final long serialVersionUID = 1L;
 		private final CustomType out = new CustomType();
-		
+
 		@Override
 		public CustomType reduce(CustomType in1, CustomType in2)
 				throws Exception {
-			
+
 			out.myInt = in1.myInt + in2.myInt;
 			out.myLong = in1.myLong + in2.myLong;
 			out.myString = "Hello!";
 			return out;
 		}
 	}
-	
+
 	public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 		private String f2Replace = "";
-		
+
 		@Override
 		public void open(Configuration config) {
-			
+
 			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
 			int sum = 0;
 			for(Integer i : ints) {
 				sum += i;
 			}
 			f2Replace = sum+"";
-			
+
 		}
 
 		@Override
@@ -483,5 +478,5 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 			return out;
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
index c7ca37d..8cc54aa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.test.javaApiOperators;
 
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.ReplicatingInputFormat;
@@ -32,11 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.NumberSequenceIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -51,23 +49,6 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-
-	private String expectedResult;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath); // 500500 = 0+1+2+3+...+999+1000
-	}
-
 	@Test
 	public void testReplicatedSourceToJoin() throws Exception {
 		/*
@@ -85,11 +66,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
 				.projectFirst(0)
 				.sum(0);
 
-		pairs.writeAsText(resultPath);
-		env.execute();
+		List<Tuple> result = pairs.collect();
 
-		expectedResult = "(500500)";
+		String expectedResult = "(500500)";
 
+		compareResultAsText(result, expectedResult);
 	}
 
 	@Test
@@ -120,11 +101,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
 				})
 				.sum(0);
 
-		pairs.writeAsText(resultPath);
-		env.execute();
+		List<Tuple1<Long>> result = pairs.collect();
 
-		expectedResult = "(500500)";
+		String expectedResult = "(500500)";
 
+		compareResultAsText(result, expectedResult);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
index d961f3a..2b7226b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -31,16 +31,13 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
 import java.util.Iterator;
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class SortPartitionITCase extends MultipleProgramsTestBase {
@@ -49,22 +46,6 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testSortPartitionByKeyField() throws Exception {
 		/*
@@ -75,16 +56,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(4);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(4) // parallelize input
 				.sortPartition(1, Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -97,19 +77,19 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(2);
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(2) // parallelize input
 				.sortPartition(4, Order.ASCENDING)
 				.sortPartition(2, Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testSortPartitionByFieldExpression() throws Exception {
 		/*
@@ -120,16 +100,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(4);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(4) // parallelize input
 				.sortPartition("f1", Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -142,17 +121,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(2);
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(2) // parallelize input
 				.sortPartition("f4", Order.ASCENDING)
 				.sortPartition("f2", Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -165,17 +143,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(3);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(3) // parallelize input
 				.sortPartition("f0.f1", Order.ASCENDING)
 				.sortPartition("f1", Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new NestedTupleChecker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -188,17 +165,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(3);
 
 		DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
-		ds
+		List result = ds
 				.map(new IdMapper()).setParallelism(1) // parallelize input
 				.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
 				.sortPartition("number", Order.DESCENDING)
 				.mapPartition(new OrderCheckMapper<POJO>(new PojoChecker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -211,15 +187,14 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		env.setParallelism(3);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds
+		List result = ds
 				.sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism
 				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-				.distinct()
-				.writeAsText(resultPath);
+				.distinct().collect();
 
-		env.execute();
+		String expected = "(true)\n";
 
-		expected = "(true)\n";
+		compareResultAsText(result, expected);
 	}
 
 	public static interface OrderChecker<T> extends Serializable {
@@ -237,7 +212,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 	public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
 		@Override
 		public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1,
-								Tuple5<Integer, Long, Integer, String, Long> t2) {
+				Tuple5<Integer, Long, Integer, String, Long> t2) {
 			return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= t2.f2;
 		}
 	}
@@ -245,19 +220,18 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 	public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
 		@Override
 		public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1,
-								Tuple2<Tuple2<Integer, Integer>, String> t2) {
+				Tuple2<Tuple2<Integer, Integer>, String> t2) {
 			return t1.f0.f1 < t2.f0.f1 ||
 					t1.f0.f1 == t2.f0.f1 && t1.f1.compareTo(t2.f1) >= 0;
- 		}
+		}
 	}
 
 	public static class PojoChecker implements OrderChecker<POJO> {
 		@Override
-		public boolean inOrder(POJO t1,
-							   POJO t2) {
+		public boolean inOrder(POJO t1, POJO t2) {
 			return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 ||
 					t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 &&
-							t1.number >= t2.number;
+					t1.number >= t2.number;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index e6367c3..e5bdc19 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -25,11 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -40,22 +38,6 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testSumMaxAndProject() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -66,10 +48,11 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
 				.andMax(1)
 				.project(0, 1);
 
-		sumDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, Long>> result = sumDs.collect();
+
+		String expected = "231,6\n";
 
-		expected = "231,6\n";
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -85,15 +68,16 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
 				.sum(0)
 				.project(1, 0);
 
-		aggregateDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Long, Integer>> result = aggregateDs.collect();
 
-		expected = "1,1\n" +
+		String expected = "1,1\n" +
 				"2,5\n" +
 				"3,15\n" +
 				"4,34\n" +
 				"5,65\n" +
 				"6,111\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -110,9 +94,10 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
 				.min(0)
 				.project(0);
 
-		aggregateDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple1<Integer>> result = aggregateDs.collect();
+
+		String expected = "1\n";
 
-		expected = "1\n";
+		compareResultAsTuples(result, expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
index 350227a..a2c10bc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -44,25 +45,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
 	private static int NUM_PROGRAMS = 3;
 
 	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
 
 	public TypeHintITCase(Configuration config) {
 		super(config);
 	}
 
 	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-	@Override
 	protected void testProgram() throws Exception {
-		expectedResult = TypeHintProgs.runProgram(curProgId, resultPath);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+		TypeHintProgs.runProgram(curProgId);
 	}
 
 	@Parameters
@@ -81,7 +71,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
 
 	private static class TypeHintProgs {
 
-		public static String runProgram(int progId, String resultPath) throws Exception {
+		public static void runProgram(int progId) throws Exception {
 			switch(progId) {
 			// Test identity map with missing types and string type hint
 			case 1: {
@@ -91,13 +81,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
 				DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
 						.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
 						.returns("Tuple3<Integer, Long, String>");
-				identityMapDs.writeAsText(resultPath);
-				env.execute();
+				List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
 
-				// return expected result
-				return "(2,2,Hello)\n" +
-				"(3,2,Hello world)\n" +
-				"(1,1,Hi)\n";
+				String expectedResult = "(2,2,Hello)\n" +
+						"(3,2,Hello world)\n" +
+						"(1,1,Hi)\n";
+
+				compareResultAsText(result, expectedResult);
+				break;
 			}
 			// Test identity map with missing types and type information type hint
 			case 2: {
@@ -108,32 +99,34 @@ public class TypeHintITCase extends JavaProgramTestBase {
 						// all following generics get erased during compilation
 						.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
 						.returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-				identityMapDs.writeAsText(resultPath);
-				env.execute();
+				List<Tuple3<Integer, Long, String>> result = identityMapDs
+						.collect();
+
+				String expectedResult = "(2,2,Hello)\n" +
+						"(3,2,Hello world)\n" +
+						"(1,1,Hi)\n";
 
-				// return expected result
-				return "(2,2,Hello)\n" +
-				"(3,2,Hello world)\n" +
-				"(1,1,Hi)\n";
+				compareResultAsText(result, expectedResult);
+				break;
 			}
 			// Test flat map with class type hint
 			case 3: {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				@SuppressWarnings({ "rawtypes", "unchecked" })
 				DataSet<Integer> identityMapDs = ds.
 				flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
-				.returns((Class) Integer.class);
-				identityMapDs.writeAsText(resultPath);
-				env.execute();
-
-				// return expected result
-				return "2\n" +
-				"3\n" +
-				"1\n";
+				.returns(Integer.class);
+				List<Integer> result = identityMapDs.collect();
+
+				String expectedResult = "2\n" +
+						"3\n" +
+						"1\n";
+
+				compareResultAsText(result, expectedResult);
+				break;
 			}
-			default: 
+			default:
 				throw new IllegalArgumentException("Invalid program id");
 			}
 		}
@@ -150,7 +143,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
 			return (V) value;
 		}
 	}
-	
+
 	public static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 2e7ae9c..7ab2764 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -61,22 +59,6 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testUnion2IdenticalDataSets() throws Exception {
 		/*
@@ -87,10 +69,11 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
 
-		unionDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+		String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
 
-		expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -107,11 +90,13 @@ public class UnionITCase extends MultipleProgramsTestBase {
 				.union(CollectionDataSets.get3TupleDataSet(env))
 				.union(CollectionDataSets.get3TupleDataSet(env));
 
-		unionDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = unionDs.collect();
 
-		expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+		String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+				+ FULL_TUPLE_3_STRING +
 				FULL_TUPLE_3_STRING +	FULL_TUPLE_3_STRING;
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -128,10 +113,11 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
 				.union(empty);
 
-		unionDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = unionDs.collect();
 
-		expected = FULL_TUPLE_3_STRING;
+		String expected = FULL_TUPLE_3_STRING;
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
@@ -142,5 +128,5 @@ public class UnionITCase extends MultipleProgramsTestBase {
 			return false;
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index a68fd82..1faf4c1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
@@ -313,6 +314,19 @@ public class CollectionDataSets {
 		}
 	}
 
+	public static class CustomTypeComparator implements Comparator<CustomType> {
+		@Override
+		public int compare(CustomType o1, CustomType o2) {
+			int diff = o1.myInt - o2.myInt;
+			if (diff != 0) {
+				return diff;
+			}
+			diff = (int) (o1.myLong - o2.myLong);
+			return diff != 0 ? diff : o1.myString.compareTo(o2.myString);
+		}
+
+	}
+
 	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
 		List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>();
 		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First", 10, 100, 1000L, "One", 10000L));