You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/24 11:16:32 UTC
[2/2] flink git commit: [FLINK-2956] [tests] Migrate integration
tests for Table API
[FLINK-2956] [tests] Migrate integration tests for Table API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89b1d233
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89b1d233
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89b1d233
Branch: refs/heads/master
Commit: 89b1d233254de024942c06700cedd86fff55970e
Parents: db2a964
Author: gallenvara <ga...@126.com>
Authored: Thu Nov 12 14:44:40 2015 +0800
Committer: Chiwan Park <ch...@apache.org>
Committed: Tue Nov 24 19:15:49 2015 +0900
----------------------------------------------------------------------
.../api/java/table/test/AggregationsITCase.java | 84 ++++++--------------
.../flink/api/java/table/test/AsITCase.java | 71 +++++------------
.../api/java/table/test/CastingITCase.java | 67 +++++-----------
.../api/java/table/test/ExpressionsITCase.java | 83 ++++++-------------
.../flink/api/java/table/test/FilterITCase.java | 65 +++++----------
.../table/test/GroupedAggregationsITCase.java | 47 +++--------
.../flink/api/java/table/test/JoinITCase.java | 79 ++++++------------
.../api/java/table/test/PojoGroupingITCase.java | 6 +-
.../flink/api/java/table/test/SelectITCase.java | 71 +++++------------
.../table/test/StringExpressionsITCase.java | 71 +++++------------
.../scala/table/test/AggregationsITCase.scala | 62 +++++----------
.../flink/api/scala/table/test/AsITCase.scala | 63 +++++----------
.../api/scala/table/test/CastingITCase.scala | 48 ++++-------
.../scala/table/test/ExpressionsITCase.scala | 71 ++++++-----------
.../api/scala/table/test/FilterITCase.scala | 64 +++++----------
.../table/test/GroupedAggreagationsITCase.scala | 48 ++++-------
.../flink/api/scala/table/test/JoinITCase.scala | 83 +++++++------------
.../api/scala/table/test/SelectITCase.scala | 69 ++++++----------
.../table/test/StringExpressionsITCase.scala | 48 ++++-------
.../api/scala/table/test/UnionITCase.scala | 12 +--
20 files changed, 375 insertions(+), 837 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 0eb6a40..3e0147c 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -44,17 +44,14 @@ import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class AggregationsITCase extends MultipleProgramsTestBase {
@@ -63,22 +60,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testAggregationTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -89,11 +70,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "231,1,21,21,11";
+ List<Row> results = ds.collect();
+ String expected = "231,1,21,21,11";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -108,11 +87,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
table.select("foo.avg");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test
@@ -122,8 +99,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
env.fromElements(
- new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
- new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
+ new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+ new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
Table table =
tableEnv.fromDataSet(input);
@@ -132,11 +109,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1,1,1,1.5,1.5,2";
+ List<Row> results = ds.collect();
+ String expected = "1,1,1,1,1.5,1.5,2";
+ compareResultAsText(results, expected);
}
@Test
@@ -146,8 +121,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSource<Tuple2<Float, String>> input =
env.fromElements(
- new Tuple2<Float, String>(1f, "Hello"),
- new Tuple2<Float, String>(2f, "Ciao"));
+ new Tuple2<>(1f, "Hello"),
+ new Tuple2<>(2f, "Ciao"));
Table table =
tableEnv.fromDataSet(input);
@@ -157,11 +132,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "5.5,2 THE COUNT";
+ List<Row> results = ds.collect();
+ String expected = "5.5,2 THE COUNT";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -169,8 +142,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f,
- "Hello"));
+ DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
Table table =
tableEnv.fromDataSet(input);
@@ -180,11 +152,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -192,7 +162,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello"));
+ DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
Table table =
tableEnv.fromDataSet(input);
@@ -202,11 +172,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index 4bb5dec..f6ab54e 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.table.Row;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class AsITCase extends MultipleProgramsTestBase {
@@ -43,22 +40,6 @@ public class AsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testAs() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -68,16 +49,14 @@ public class AsITCase extends MultipleProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -89,11 +68,9 @@ public class AsITCase extends MultipleProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -105,11 +82,9 @@ public class AsITCase extends MultipleProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -121,11 +96,9 @@ public class AsITCase extends MultipleProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -137,11 +110,9 @@ public class AsITCase extends MultipleProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -154,11 +125,9 @@ public class AsITCase extends MultipleProgramsTestBase {
" c");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 0636dfa..aa4ef2a 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -26,16 +26,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class CastingITCase extends MultipleProgramsTestBase {
@@ -44,30 +41,13 @@ public class CastingITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testAutoCastToString() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
- env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>(
- (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+ env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
Table table =
tableEnv.fromDataSet(input);
@@ -76,11 +56,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
"f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\"");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1b,1s,1i,1L,1.0f,1.0d";
+ List<Row> results = ds.collect();
+ String expected = "1b,1s,1i,1L,1.0f,1.0d";
+ compareResultAsText(results, expected);
}
@Test
@@ -89,8 +67,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
- env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>(
- (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+ env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
Table table =
tableEnv.fromDataSet(input);
@@ -99,11 +76,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "2,2,2,2.0,2.0,2.0";
+ List<Row> results = ds.collect();
+ String expected = "2,2,2,2.0,2.0,2.0";
+ compareResultAsText(results, expected);
}
@Test
@@ -113,8 +88,8 @@ public class CastingITCase extends MultipleProgramsTestBase {
DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
env.fromElements(
- new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
- new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello"));
+ new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+ new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello"));
Table table =
tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
@@ -123,11 +98,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "2,2,2,2,2.0,2.0,Hello";
+ List<Row> results = ds.collect();
+ String expected = "2,2,2,2,2.0,2.0,Hello";
+ compareResultAsText(results, expected);
}
@Test
@@ -136,7 +109,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple3<String, String, String>> input =
- env.fromElements(new Tuple3<String, String, String>("1", "true", "2.0"));
+ env.fromElements(new Tuple3<>("1", "true", "2.0"));
Table table =
tableEnv.fromDataSet(input);
@@ -145,11 +118,9 @@ public class CastingITCase extends MultipleProgramsTestBase {
"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1,1,1,2.0,2.0,true\n";
+ List<Row> results = ds.collect();
+ String expected = "1,1,1,1,2.0,2.0,true\n";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 853da87..c9bba62 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -27,16 +27,13 @@ import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class ExpressionsITCase extends MultipleProgramsTestBase {
@@ -45,29 +42,13 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testArithmetic() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple2<Integer, Integer>> input =
- env.fromElements(new Tuple2<Integer, Integer>(5, 10));
+ env.fromElements(new Tuple2<>(5, 10));
Table table =
tableEnv.fromDataSet(input, "a, b");
@@ -76,11 +57,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
"a - 5, a + 5, a / 2, a * 2, a % 2, -a");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "0,10,2,10,1,-5";
+ List<Row> results = ds.collect();
+ String expected = "0,10,2,10,1,-5";
+ compareResultAsText(results, expected);
}
@Test
@@ -89,7 +68,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple2<Integer, Boolean>> input =
- env.fromElements(new Tuple2<Integer, Boolean>(5, true));
+ env.fromElements(new Tuple2<>(5, true));
Table table =
tableEnv.fromDataSet(input, "a, b");
@@ -98,11 +77,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
"b && true, b && false, b || false, !b");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "true,false,true,false";
+ List<Row> results = ds.collect();
+ String expected = "true,false,true,false";
+ compareResultAsText(results, expected);
}
@Test
@@ -111,7 +88,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple3<Integer, Integer, Integer>> input =
- env.fromElements(new Tuple3<Integer, Integer, Integer>(5, 5, 4));
+ env.fromElements(new Tuple3<>(5, 5, 4));
Table table =
tableEnv.fromDataSet(input, "a, b, c");
@@ -120,11 +97,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
"a > c, a >= b, a < c, a.isNull, a.isNotNull");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "true,true,false,false,true";
+ List<Row> results = ds.collect();
+ String expected = "true,true,false,false,true";
+ compareResultAsText(results, expected);
}
@Test
@@ -133,7 +108,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple2<Byte, Byte>> input =
- env.fromElements(new Tuple2<Byte, Byte>((byte) 3, (byte) 5));
+ env.fromElements(new Tuple2<>((byte) 3, (byte) 5));
Table table =
tableEnv.fromDataSet(input, "a, b");
@@ -142,11 +117,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
"a & b, a | b, a ^ b, ~a");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,7,6,-4";
+ List<Row> results = ds.collect();
+ String expected = "1,7,6,-4";
+ compareResultAsText(results, expected);
}
@Test
@@ -155,7 +128,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple2<Integer, Byte>> input =
- env.fromElements(new Tuple2<Integer, Byte>(3, (byte) 5));
+ env.fromElements(new Tuple2<>(3, (byte) 5));
Table table =
tableEnv.fromDataSet(input, "a, b");
@@ -164,11 +137,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
"a & b, a | b, a ^ b, ~a");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,7,6,-4";
+ List<Row> results = ds.collect();
+ String expected = "1,7,6,-4";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -177,7 +148,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSource<Tuple2<Float, Byte>> input =
- env.fromElements(new Tuple2<Float, Byte>(3.0f, (byte) 5));
+ env.fromElements(new Tuple2<>(3.0f, (byte) 5));
Table table =
tableEnv.fromDataSet(input, "a, b");
@@ -186,11 +157,9 @@ public class ExpressionsITCase extends MultipleProgramsTestBase {
table.select("a & b, a | b, a ^ b, ~a");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index 54d8f42..44e0def 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class FilterITCase extends MultipleProgramsTestBase {
@@ -43,22 +40,6 @@ public class FilterITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testAllRejectingFilter() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -73,11 +54,9 @@ public class FilterITCase extends MultipleProgramsTestBase {
.filter("false");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "\n";
+ List<Row> results = ds.collect();
+ String expected = "\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -94,16 +73,14 @@ public class FilterITCase extends MultipleProgramsTestBase {
.filter("true");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -120,13 +97,11 @@ public class FilterITCase extends MultipleProgramsTestBase {
.filter(" a % 2 = 0 ");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+ List<Row> results = ds.collect();
+ String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -143,13 +118,11 @@ public class FilterITCase extends MultipleProgramsTestBase {
.filter("!( a % 2 <> 0 ) ");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+ List<Row> results = ds.collect();
+ String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -157,18 +130,16 @@ public class FilterITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<Integer, Long, String>(300, 1L, "Hello"));
+ DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
Table table = tableEnv.fromDataSet(input, "a, b, c");
Table result = table.filter("a = 300 ");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "300,1,Hello\n";
+ List<Row> results = ds.collect();
+ String expected = "300,1,Hello\n";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index aa0b481..4abba3b 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -25,17 +25,14 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
@@ -44,22 +41,6 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test(expected = ExpressionException.class)
public void testGroupingOnNonExistentField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -74,11 +55,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
.groupBy("foo").select("a.avg");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test
@@ -95,11 +74,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
.groupBy("b").select("b, a.sum");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+ List<Row> results = ds.collect();
+ String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -120,11 +97,9 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
.groupBy("b").select("a.sum");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+ List<Row> results = ds.collect();
+ String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
index 58f5d23..428aec5 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -26,17 +26,14 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class JoinITCase extends MultipleProgramsTestBase {
@@ -45,22 +42,6 @@ public class JoinITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testJoin() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -75,11 +56,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
Table result = in1.join(in2).where("b === e").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -96,11 +75,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "Hi,Hallo\n";
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n";
+ compareResultAsText(results, expected);
}
@Test
@@ -117,12 +94,10 @@ public class JoinITCase extends MultipleProgramsTestBase {
Table result = in1.join(in2).where("a === d && b === h").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -139,11 +114,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
Table result = in1.join(in2).where("foo === e").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -161,11 +134,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
.join(in2).where("a === g").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -183,11 +154,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
.join(in2).where("a === d").select("c, g");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = ds.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test
@@ -205,11 +174,9 @@ public class JoinITCase extends MultipleProgramsTestBase {
.join(in2).where("a === d").select("g.count");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "6";
+ List<Row> results = ds.collect();
+ String expected = "6";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
index 9c62a51..d61912b 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -43,9 +43,9 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, Double, String>> data = env.fromElements(
- new Tuple3<String, Double, String>("A", 23.0, "Z"),
- new Tuple3<String, Double, String>("A", 24.0, "Y"),
- new Tuple3<String, Double, String>("B", 1.0, "Z"));
+ new Tuple3<>("A", 23.0, "Z"),
+ new Tuple3<>("A", 24.0, "Y"),
+ new Tuple3<>("B", 1.0, "Z"));
TableEnvironment tableEnv = new TableEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index 5385abd..9e42f53 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -25,17 +25,14 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class SelectITCase extends MultipleProgramsTestBase {
@@ -44,22 +41,6 @@ public class SelectITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testSimpleSelectAllWithAs() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -73,16 +54,14 @@ public class SelectITCase extends MultipleProgramsTestBase {
.select("a, b, c");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
}
@@ -100,13 +79,11 @@ public class SelectITCase extends MultipleProgramsTestBase {
.select("a, b");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -119,11 +96,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table in = tableEnv.fromDataSet(ds, "a, b");
DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = " sorry dude ";
+ List<Row> results = resultSet.collect();
+ String expected = " sorry dude ";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -136,11 +111,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = " sorry dude ";
+ List<Row> results = resultSet.collect();
+ String expected = " sorry dude ";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -153,11 +126,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = " today's not your day ";
+ List<Row> results = resultSet.collect();
+ String expected = " today's not your day ";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -170,10 +141,8 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "sorry bro";
+ List<Row> results = resultSet.collect();
+ String expected = "sorry bro";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 6fe7e8c..7936f8c 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -25,16 +25,13 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class StringExpressionsITCase extends MultipleProgramsTestBase {
@@ -43,30 +40,14 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected = "";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testSubstring() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
DataSet<Tuple2<String, Integer>> ds = env.fromElements(
- new Tuple2<String, Integer>("AAAA", 2),
- new Tuple2<String, Integer>("BBBB", 1));
+ new Tuple2<>("AAAA", 2),
+ new Tuple2<>("BBBB", 1));
Table in = tableEnv.fromDataSet(ds, "a, b");
@@ -74,11 +55,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
.select("a.substring(0, b)");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "AA\nB";
+ List<Row> results = resultSet.collect();
+ String expected = "AA\nB";
+ compareResultAsText(results, expected);
}
@Test
@@ -87,8 +66,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSet<Tuple2<String, Integer>> ds = env.fromElements(
- new Tuple2<String, Integer>("ABCD", 2),
- new Tuple2<String, Integer>("ABCD", 1));
+ new Tuple2<>("ABCD", 2),
+ new Tuple2<>("ABCD", 1));
Table in = tableEnv.fromDataSet(ds, "a, b");
@@ -96,11 +75,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
.select("a.substring(b)");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "CD\nBCD";
+ List<Row> results = resultSet.collect();
+ String expected = "CD\nBCD";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -109,8 +86,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSet<Tuple2<String, Float>> ds = env.fromElements(
- new Tuple2<String, Float>("ABCD", 2.0f),
- new Tuple2<String, Float>("ABCD", 1.0f));
+ new Tuple2<>("ABCD", 2.0f),
+ new Tuple2<>("ABCD", 1.0f));
Table in = tableEnv.fromDataSet(ds, "a, b");
@@ -118,11 +95,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
.select("a.substring(0, b)");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = resultSet.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
@Test(expected = ExpressionException.class)
@@ -131,8 +106,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
TableEnvironment tableEnv = new TableEnvironment();
DataSet<Tuple2<String, String>> ds = env.fromElements(
- new Tuple2<String, String>("ABCD", "a"),
- new Tuple2<String, String>("ABCD", "b"));
+ new Tuple2<>("ABCD", "a"),
+ new Tuple2<>("ABCD", "b"));
Table in = tableEnv.fromDataSet(ds, "a, b");
@@ -140,10 +115,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
.select("a.substring(b, 15)");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- expected = "";
+ List<Row> results = resultSet.collect();
+ String expected = "";
+ compareResultAsText(results, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 7ac8eef..08ad1f4 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -22,32 +22,16 @@ import org.apache.flink.api.table.{Row, ExpressionException}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test
def testAggregationTypes(): Unit = {
@@ -55,10 +39,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "231,1,21,21,11"
+ val results = ds.collect()
+ val expected = "231,1,21,21,11"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -67,10 +50,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
.select('foo.avg).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
+ val expected = ""
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -82,10 +64,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
.toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,1,1,1.5,1.5,2"
+ val expected = "1,1,1,1,1.5,1.5,2"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -94,10 +75,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
.select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "5.5,2 THE COUNT"
+ val expected = "5.5,2 THE COUNT"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -106,10 +86,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
.select('_1.sum).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
+ val expected = ""
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -118,10 +97,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
.select('_2.sum.sum).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
+ val expected = ""
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 28d0e07..59573eb 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -22,48 +22,30 @@ import org.apache.flink.api.table.{Row, ExpressionException}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test
def testAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -71,10 +53,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
+ val expected = "no"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -82,10 +63,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
+ val expected = "no"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -93,10 +73,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
+ val expected = "no"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -105,10 +84,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
+ val expected = "no"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -117,9 +95,8 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
+ val expected = "no"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 1e34521..9c58b64 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -19,7 +19,6 @@
package org.apache.flink.api.scala.table.test
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -27,28 +26,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test
def testAutoCastToString(): Unit = {
@@ -57,10 +41,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
.select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
.toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1b,1s,1i,1L,1.0f,1.0d"
+ val expected = "1b,1s,1i,1L,1.0f,1.0d"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -72,10 +55,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
.toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,2,2.0,2.0,2.0"
+ val expected = "2,2,2,2.0,2.0,2.0"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -89,10 +71,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
.filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
.toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,2,2,2.0,2.0"
+ val expected = "2,2,2,2,2.0,2.0"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -109,10 +90,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
'_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
'_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
.toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,1,1,2.0,2.0,true\n"
+ val expected = "1,1,1,1,2.0,2.0,true\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 5905d24..f5d7a4d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -21,32 +21,16 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.table.{Row, ExpressionException}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test
def testArithmetic(): Unit = {
@@ -54,10 +38,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 10)).as('a, 'b)
.select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "0,10,2,10,1,-5"
+ val expected = "0,10,2,10,1,-5"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -66,10 +49,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, true)).as('a, 'b)
.select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "true,false,true,false"
+ val expected = "true,false,true,false"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -78,10 +60,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
.select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "true,true,false,false,true"
+ val expected = "true,true,false,false,true"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -91,10 +72,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
.select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
+ val expected = "1,7,6,-4"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -104,10 +84,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
.select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
+ val expected = "1,7,6,-4"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ExpressionException])
@@ -116,11 +95,10 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((3.0, 5)).as('a, 'b)
- .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
+ .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
+ val expected = "1,7,6,-4"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -130,10 +108,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
.groupBy("a").select("a, a.count As cnt").toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "3,1"
+ val expected = "3,1"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 75cd728..c0b86f8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -18,38 +18,21 @@
package org.apache.flink.api.scala.table.test
-import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = null
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test
def testAllRejectingFilter(): Unit = {
@@ -60,10 +43,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val filterDs = ds.filter( Literal(false) )
-
- filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "\n"
+ val expected = "\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -75,15 +57,14 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val filterDs = ds.filter( Literal(true) )
-
- filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -94,9 +75,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env)
val filterDs = ds.filter( _._3.contains("world") )
- filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -108,12 +89,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 === 0 )
-
- filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
- "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
// These two not yet done, but are planned
@@ -129,10 +110,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val ds = CollectionDataSets.getStringDataSet(env)
val filterDs = ds.filter( _.startsWith("H") )
-
- filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Ignore
@@ -144,9 +124,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.getCustomTypeDataSet(env)
val filterDs = ds.filter( _.myString.contains("a") )
- filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+ val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89b1d233/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 0269f07..f04faf2 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -22,32 +22,16 @@ import org.apache.flink.api.table.{Row, ExpressionException}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+
@RunWith(classOf[Parameterized])
class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
- }
@Test(expected = classOf[ExpressionException])
def testGroupingOnNonExistentField(): Unit = {
@@ -56,10 +40,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
.groupBy('_foo)
.select('a.avg).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
+ val expected = ""
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -72,10 +55,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
.groupBy('b)
.select('b, 'a.sum).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -88,10 +70,9 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
.groupBy('b)
.select('a.sum).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+ val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
@@ -109,9 +90,8 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
|Avg ( a ) as d1, a.avg as d2,
|Count(a) as e1, a.count as e2
""".stripMargin).toDataSet[Row]
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "231,231,1,1,21,21,11,11,21,21"
+ val expected = "231,231,1,1,21,21,11,11,21,21"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}