You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/24 11:16:32 UTC

[2/2] flink git commit: [FLINK-2956] [tests] Migrate integration tests for Table API

[FLINK-2956] [tests] Migrate integration tests for Table API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89b1d233
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89b1d233
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89b1d233

Branch: refs/heads/master
Commit: 89b1d233254de024942c06700cedd86fff55970e
Parents: db2a964
Author: gallenvara <ga...@126.com>
Authored: Thu Nov 12 14:44:40 2015 +0800
Committer: Chiwan Park <ch...@apache.org>
Committed: Tue Nov 24 19:15:49 2015 +0900

----------------------------------------------------------------------
 .../api/java/table/test/AggregationsITCase.java | 84 ++++++--------------
 .../flink/api/java/table/test/AsITCase.java     | 71 +++++------------
 .../api/java/table/test/CastingITCase.java      | 67 +++++-----------
 .../api/java/table/test/ExpressionsITCase.java  | 83 ++++++-------------
 .../flink/api/java/table/test/FilterITCase.java | 65 +++++----------
 .../table/test/GroupedAggregationsITCase.java   | 47 +++--------
 .../flink/api/java/table/test/JoinITCase.java   | 79 ++++++------------
 .../api/java/table/test/PojoGroupingITCase.java |  6 +-
 .../flink/api/java/table/test/SelectITCase.java | 71 +++++------------
 .../table/test/StringExpressionsITCase.java     | 71 +++++------------
 .../scala/table/test/AggregationsITCase.scala   | 62 +++++----------
 .../flink/api/scala/table/test/AsITCase.scala   | 63 +++++----------
 .../api/scala/table/test/CastingITCase.scala    | 48 ++++-------
 .../scala/table/test/ExpressionsITCase.scala    | 71 ++++++-----------
 .../api/scala/table/test/FilterITCase.scala     | 64 +++++----------
 .../table/test/GroupedAggreagationsITCase.scala | 48 ++++-------
 .../flink/api/scala/table/test/JoinITCase.scala | 83 +++++++------------
 .../api/scala/table/test/SelectITCase.scala     | 69 ++++++----------
 .../table/test/StringExpressionsITCase.scala    | 48 ++++-------
 .../api/scala/table/test/UnionITCase.scala      | 12 +--
 20 files changed, 375 insertions(+), 837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 0eb6a40..3e0147c 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -44,17 +44,14 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class AggregationsITCase extends MultipleProgramsTestBase {
 
@@ -63,22 +60,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testAggregationTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -89,11 +70,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "231,1,21,21,11";
+		List<Row> results = ds.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -108,11 +87,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 				table.select("foo.avg");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -122,8 +99,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
 				env.fromElements(
-						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
-						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
+						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -132,11 +109,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1,1,1,1.5,1.5,2";
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,1.5,1.5,2";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -146,8 +121,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 		DataSource<Tuple2<Float, String>> input =
 				env.fromElements(
-						new Tuple2<Float, String>(1f, "Hello"),
-						new Tuple2<Float, String>(2f, "Ciao"));
+						new Tuple2<>(1f, "Hello"),
+						new Tuple2<>(2f, "Ciao"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -157,11 +132,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "5.5,2 THE COUNT";
+		List<Row> results = ds.collect();
+		String expected = "5.5,2 THE COUNT";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -169,8 +142,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f,
-				"Hello"));
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -180,11 +152,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -192,7 +162,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello"));
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -202,11 +172,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index 4bb5dec..f6ab54e 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class AsITCase extends MultipleProgramsTestBase {
 
@@ -43,22 +40,6 @@ public class AsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testAs() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -68,16 +49,14 @@ public class AsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
 				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
 				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
 				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
 				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
 				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -89,11 +68,9 @@ public class AsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -105,11 +82,9 @@ public class AsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -121,11 +96,9 @@ public class AsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -137,11 +110,9 @@ public class AsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -154,11 +125,9 @@ public class AsITCase extends MultipleProgramsTestBase {
 						" c");
 
 		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 0636dfa..aa4ef2a 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -26,16 +26,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class CastingITCase extends MultipleProgramsTestBase {
 
@@ -44,30 +41,13 @@ public class CastingITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testAutoCastToString() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
-				env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>(
-						(byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+				env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -76,11 +56,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
 				"f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\"");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1b,1s,1i,1L,1.0f,1.0d";
+		List<Row> results = ds.collect();
+		String expected = "1b,1s,1i,1L,1.0f,1.0d";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -89,8 +67,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
-				env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>(
-						(byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+				env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -99,11 +76,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
 				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "2,2,2,2.0,2.0,2.0";
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2.0,2.0,2.0";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -113,8 +88,8 @@ public class CastingITCase extends MultipleProgramsTestBase {
 
 		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
 				env.fromElements(
-						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
-						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello"));
+						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello"));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
@@ -123,11 +98,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
 				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "2,2,2,2,2.0,2.0,Hello";
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2,2.0,2.0,Hello";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -136,7 +109,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple3<String, String, String>> input =
-				env.fromElements(new Tuple3<String, String, String>("1", "true", "2.0"));
+				env.fromElements(new Tuple3<>("1", "true", "2.0"));
 
 		Table table =
 				tableEnv.fromDataSet(input);
@@ -145,11 +118,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
 				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1,1,1,2.0,2.0,true\n";
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,2.0,2.0,true\n";
+		compareResultAsText(results, expected);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 853da87..c9bba62 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -27,16 +27,13 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class ExpressionsITCase extends MultipleProgramsTestBase {
 
@@ -45,29 +42,13 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testArithmetic() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple2<Integer, Integer>> input =
-				env.fromElements(new Tuple2<Integer, Integer>(5, 10));
+				env.fromElements(new Tuple2<>(5, 10));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b");
@@ -76,11 +57,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				"a - 5, a + 5, a / 2, a * 2, a % 2, -a");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "0,10,2,10,1,-5";
+		List<Row> results = ds.collect();
+		String expected = "0,10,2,10,1,-5";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -89,7 +68,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<Integer, Boolean>(5, true));
+				env.fromElements(new Tuple2<>(5, true));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b");
@@ -98,11 +77,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				"b && true, b && false, b || false, !b");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "true,false,true,false";
+		List<Row> results = ds.collect();
+		String expected = "true,false,true,false";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -111,7 +88,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple3<Integer, Integer, Integer>> input =
-				env.fromElements(new Tuple3<Integer, Integer, Integer>(5, 5, 4));
+				env.fromElements(new Tuple3<>(5, 5, 4));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b, c");
@@ -120,11 +97,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				"a > c, a >= b, a < c, a.isNull, a.isNotNull");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "true,true,false,false,true";
+		List<Row> results = ds.collect();
+		String expected = "true,true,false,false,true";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -133,7 +108,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple2<Byte, Byte>> input =
-				env.fromElements(new Tuple2<Byte, Byte>((byte) 3, (byte) 5));
+				env.fromElements(new Tuple2<>((byte) 3, (byte) 5));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b");
@@ -142,11 +117,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				"a & b, a | b, a ^ b, ~a");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,7,6,-4";
+		List<Row> results = ds.collect();
+		String expected = "1,7,6,-4";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -155,7 +128,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple2<Integer, Byte>> input =
-				env.fromElements(new Tuple2<Integer, Byte>(3, (byte) 5));
+				env.fromElements(new Tuple2<>(3, (byte) 5));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b");
@@ -164,11 +137,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				"a & b, a | b, a ^ b, ~a");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,7,6,-4";
+		List<Row> results = ds.collect();
+		String expected = "1,7,6,-4";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -177,7 +148,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSource<Tuple2<Float, Byte>> input =
-				env.fromElements(new Tuple2<Float, Byte>(3.0f, (byte) 5));
+				env.fromElements(new Tuple2<>(3.0f, (byte) 5));
 
 		Table table =
 				tableEnv.fromDataSet(input, "a, b");
@@ -186,11 +157,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
 				table.select("a & b, a | b, a ^ b, ~a");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index 54d8f42..44e0def 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class FilterITCase extends MultipleProgramsTestBase {
 
@@ -43,22 +40,6 @@ public class FilterITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testAllRejectingFilter() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -73,11 +54,9 @@ public class FilterITCase extends MultipleProgramsTestBase {
 				.filter("false");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "\n";
+		List<Row> results = ds.collect();
+		String expected = "\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -94,16 +73,14 @@ public class FilterITCase extends MultipleProgramsTestBase {
 				.filter("true");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
 				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
 				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
 				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
 				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
 				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -120,13 +97,11 @@ public class FilterITCase extends MultipleProgramsTestBase {
 				.filter(" a % 2 = 0 ");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
 				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
 				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -143,13 +118,11 @@ public class FilterITCase extends MultipleProgramsTestBase {
 				.filter("!( a % 2 <> 0 ) ");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
 				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
 				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -157,18 +130,16 @@ public class FilterITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<Integer, Long, String>(300, 1L, "Hello"));
+		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
 
 		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table.filter("a = 300 ");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "300,1,Hello\n";
+		List<Row> results = ds.collect();
+		String expected = "300,1,Hello\n";
+		compareResultAsText(results, expected);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index aa0b481..4abba3b 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -25,17 +25,14 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 
@@ -44,22 +41,6 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test(expected = ExpressionException.class)
 	public void testGroupingOnNonExistentField() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -74,11 +55,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 				.groupBy("foo").select("a.avg");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -95,11 +74,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 				.groupBy("b").select("b, a.sum");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+		List<Row> results = ds.collect();
+		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -120,11 +97,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 				.groupBy("b").select("a.sum");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+		compareResultAsText(results, expected);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
index 58f5d23..428aec5 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -26,17 +26,14 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class JoinITCase extends MultipleProgramsTestBase {
 
@@ -45,22 +42,6 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testJoin() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -75,11 +56,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table result = in1.join(in2).where("b === e").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -96,11 +75,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "Hi,Hallo\n";
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -117,12 +94,10 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
 				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -139,11 +114,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table result = in1.join(in2).where("foo === e").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -161,11 +134,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				.join(in2).where("a === g").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -183,11 +154,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				.join(in2).where("a === d").select("c, g");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -205,11 +174,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				.join(in2).where("a === d").select("g.count");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "6";
+		List<Row> results = ds.collect();
+		String expected = "6";
+		compareResultAsText(results, expected);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
index 9c62a51..d61912b 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -43,9 +43,9 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-			new Tuple3<String, Double, String>("A", 23.0, "Z"),
-			new Tuple3<String, Double, String>("A", 24.0, "Y"),
-			new Tuple3<String, Double, String>("B", 1.0, "Z"));
+			new Tuple3<>("A", 23.0, "Z"),
+			new Tuple3<>("A", 24.0, "Y"),
+			new Tuple3<>("B", 1.0, "Z"));
 
 		TableEnvironment tableEnv = new TableEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index 5385abd..9e42f53 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -25,17 +25,14 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class SelectITCase extends MultipleProgramsTestBase {
 
@@ -44,22 +41,6 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testSimpleSelectAllWithAs() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -73,16 +54,14 @@ public class SelectITCase extends MultipleProgramsTestBase {
 				.select("a, b, c");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
 				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
 				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
 				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
 				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
 				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
 
 	}
 
@@ -100,13 +79,11 @@ public class SelectITCase extends MultipleProgramsTestBase {
 				.select("a, b");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
 				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
 				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -119,11 +96,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = " sorry dude ";
+		List<Row> results = resultSet.collect();
+		String expected = " sorry dude ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -136,11 +111,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = " sorry dude ";
+		List<Row> results = resultSet.collect();
+		String expected = " sorry dude ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -153,11 +126,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = " today's not your day ";
+		List<Row> results = resultSet.collect();
+		String expected = " today's not your day ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -170,10 +141,8 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "sorry bro";
+		List<Row> results = resultSet.collect();
+		String expected = "sorry bro";
+		compareResultAsText(results, expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 6fe7e8c..7936f8c 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -25,16 +25,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class StringExpressionsITCase extends MultipleProgramsTestBase {
 
@@ -43,30 +40,14 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected = "";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testSubstring() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<String, Integer>("AAAA", 2),
-				new Tuple2<String, Integer>("BBBB", 1));
+				new Tuple2<>("AAAA", 2),
+				new Tuple2<>("BBBB", 1));
 
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
@@ -74,11 +55,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 				.select("a.substring(0, b)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "AA\nB";
+		List<Row> results = resultSet.collect();
+		String expected = "AA\nB";
+		compareResultAsText(results, expected);
 	}
 
 	@Test
@@ -87,8 +66,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<String, Integer>("ABCD", 2),
-				new Tuple2<String, Integer>("ABCD", 1));
+				new Tuple2<>("ABCD", 2),
+				new Tuple2<>("ABCD", 1));
 
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
@@ -96,11 +75,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 				.select("a.substring(b)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "CD\nBCD";
+		List<Row> results = resultSet.collect();
+		String expected = "CD\nBCD";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -109,8 +86,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple2<String, Float>> ds = env.fromElements(
-				new Tuple2<String, Float>("ABCD", 2.0f),
-				new Tuple2<String, Float>("ABCD", 1.0f));
+				new Tuple2<>("ABCD", 2.0f),
+				new Tuple2<>("ABCD", 1.0f));
 
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
@@ -118,11 +95,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 				.select("a.substring(0, b)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = resultSet.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = ExpressionException.class)
@@ -131,8 +106,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple2<String, String>> ds = env.fromElements(
-				new Tuple2<String, String>("ABCD", "a"),
-				new Tuple2<String, String>("ABCD", "b"));
+				new Tuple2<>("ABCD", "a"),
+				new Tuple2<>("ABCD", "b"));
 
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
@@ -140,10 +115,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 				.select("a.substring(b, 15)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-		expected = "";
+		List<Row> results = resultSet.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 7ac8eef..08ad1f4 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -22,32 +22,16 @@ import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testAggregationTypes(): Unit = {
@@ -55,10 +39,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "231,1,21,21,11"
+    val results = ds.collect()
+    val expected = "231,1,21,21,11"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -67,10 +50,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('foo.avg).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -82,10 +64,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
       .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
       .toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,1,1,1.5,1.5,2"
+    val expected = "1,1,1,1,1.5,1.5,2"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -94,10 +75,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
       .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "5.5,2 THE COUNT"
+    val expected = "5.5,2 THE COUNT"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -106,10 +86,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
       .select('_1.sum).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -118,10 +97,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
       .select('_2.sum.sum).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 28d0e07..59573eb 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -22,48 +22,30 @@ import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
       "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
       "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
       "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
       "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -71,10 +53,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -82,10 +63,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -93,10 +73,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -105,10 +84,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -117,9 +95,8 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 1e34521..9c58b64 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.api.scala.table.test
 
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -27,28 +26,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testAutoCastToString(): Unit = {
@@ -57,10 +41,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
       .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
       .toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1b,1s,1i,1L,1.0f,1.0d"
+    val expected = "1b,1s,1i,1L,1.0f,1.0d"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -72,10 +55,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
       .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
       .toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2.0,2.0,2.0"
+    val expected = "2,2,2,2.0,2.0,2.0"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -89,10 +71,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
       .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
       .toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2,2.0,2.0"
+    val expected = "2,2,2,2,2.0,2.0"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -109,10 +90,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
         '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
         '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
     .toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,1,1,2.0,2.0,true\n"
+    val expected = "1,1,1,1,2.0,2.0,true\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 5905d24..f5d7a4d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -21,32 +21,16 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testArithmetic(): Unit = {
@@ -54,10 +38,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 10)).as('a, 'b)
       .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "0,10,2,10,1,-5"
+    val expected = "0,10,2,10,1,-5"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -66,10 +49,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, true)).as('a, 'b)
       .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,false,true,false"
+    val expected = "true,false,true,false"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -78,10 +60,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
       .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,true,false,false,true"
+    val expected = "true,true,false,false,true"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -91,10 +72,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
       .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -104,10 +84,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
       .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ExpressionException])
@@ -116,11 +95,10 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = env.fromElements((3.0, 5)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] 
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -130,10 +108,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
       .groupBy("a").select("a, a.count As cnt").toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,1"
+    val expected = "3,1"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 75cd728..c0b86f8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -18,38 +18,21 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 
 @RunWith(classOf[Parameterized])
 class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testAllRejectingFilter(): Unit = {
@@ -60,10 +43,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(false) )
-
-    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "\n"
+    val expected = "\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -75,15 +57,14 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(true) )
-
-    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
       "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
       "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
       "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
       "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -94,9 +75,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val filterDs = ds.filter( _._3.contains("world") )
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+    val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -108,12 +89,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
-
-    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
       "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // These two not yet done, but are planned
@@ -129,10 +110,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val ds = CollectionDataSets.getStringDataSet(env)
 
     val filterDs = ds.filter( _.startsWith("H") )
-
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Ignore
@@ -144,9 +124,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getCustomTypeDataSet(env)
     val filterDs = ds.filter( _.myString.contains("a") )
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 0269f07..f04faf2 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -22,32 +22,16 @@ import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test(expected = classOf[ExpressionException])
   def testGroupingOnNonExistentField(): Unit = {
@@ -56,10 +40,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('_foo)
       .select('a.avg).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -72,10 +55,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
       .select('b, 'a.sum).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -88,10 +70,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
       .select('a.sum).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -109,9 +90,8 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
           |Avg ( a ) as d1, a.avg as d2,
           |Count(a) as e1, a.count as e2
         """.stripMargin).toDataSet[Row]
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "231,231,1,1,21,21,11,11,21,21"
+    val expected = "231,231,1,1,21,21,11,11,21,21"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }