You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/29 13:47:50 UTC
[3/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases
[FLINK-3656] [table] Consolidate ITCases
Merge FilterIT/SelectIT to CalcITCases
Merge FromDataSet/ToTable to TableEnvironmentITCases
Merge aggregating ITCases
All batch ITCases use TableProgramsTestBase
This closes #2566.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7758571a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7758571a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7758571a
Branch: refs/heads/master
Commit: 7758571ae7a6a26859d91eb80e7b4df689e79c46
Parents: 8243138
Author: twalthr <tw...@apache.org>
Authored: Thu Sep 29 10:20:35 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 29 15:43:37 2016 +0200
----------------------------------------------------------------------
.../api/java/batch/TableEnvironmentITCase.java | 473 +++++++++++++++++-
.../java/batch/table/AggregationsITCase.java | 204 +++++++-
.../flink/api/java/batch/table/CalcITCase.java | 290 +++++++++++
.../api/java/batch/table/CastingITCase.java | 2 -
.../api/java/batch/table/DistinctITCase.java | 76 ---
.../api/java/batch/table/FilterITCase.java | 170 -------
.../api/java/batch/table/FromDataSetITCase.java | 499 -------------------
.../batch/table/GroupedAggregationsITCase.java | 124 -----
.../flink/api/java/batch/table/JoinITCase.java | 15 +-
.../java/batch/table/PojoGroupingITCase.java | 90 ----
.../api/java/batch/table/SelectITCase.java | 153 ------
.../scala/batch/TableEnvironmentITCase.scala | 129 +++++
.../flink/api/scala/batch/TableSinkITCase.scala | 14 +-
.../api/scala/batch/TableSourceITCase.scala | 12 +-
.../flink/api/scala/batch/sql/CalcITCase.scala | 277 ++++++++++
.../api/scala/batch/sql/FilterITCase.scala | 158 ------
.../api/scala/batch/sql/SelectITCase.scala | 148 ------
.../scala/batch/table/AggregationsITCase.scala | 222 ++++++++-
.../api/scala/batch/table/CalcITCase.scala | 309 +++++++++++-
.../api/scala/batch/table/DistinctITCase.scala | 62 ---
.../api/scala/batch/table/FilterITCase.scala | 188 -------
.../batch/table/GroupedAggregationsITCase.scala | 200 --------
.../api/scala/batch/table/JoinITCase.scala | 55 +-
.../api/scala/batch/table/SelectITCase.scala | 190 -------
.../api/scala/batch/table/ToTableITCase.scala | 158 ------
.../batch/utils/TableProgramsTestBase.scala | 24 +-
.../api/scala/stream/table/CalcITCase.scala | 285 +++++++++++
.../api/scala/stream/table/FilterITCase.scala | 143 ------
.../api/scala/stream/table/SelectITCase.scala | 175 -------
29 files changed, 2214 insertions(+), 2631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 8fdb2da..5e40724 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -18,20 +18,31 @@
package org.apache.flink.api.java.batch;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.*;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.ValidationException;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.List;
-
@RunWith(Parameterized.class)
public class TableEnvironmentITCase extends TableProgramsTestBase {
@@ -39,6 +50,14 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
super(mode, configMode);
}
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ { TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+ { TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT() }
+ });
+ }
+
@Test
public void testSimpleRegister() throws Exception {
final String tableName = "MyTable";
@@ -145,4 +164,452 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
// Must fail. Table is bound to different TableEnvironment.
tableEnv2.registerTable("MyTable", t);
}
+
+ @Test
+ public void testAsFromTuple() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv
+ .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+ .select("a, b, c");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToTuple() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv
+ .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+ .select("a, b, c");
+
+ TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ DataSet<?> ds = tableEnv.toDataSet(table, ti);
+ List<?> results = ds.collect();
+ String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
+ "(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
+ "(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
+ "(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
+ "(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
+ "(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
+ "(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromTupleToPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
+ data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
+ data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
+ data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data), "a, b, c, d")
+ .select("a, b, c, d");
+
+ DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+ List<SmallPojo2> results = ds.collect();
+ String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<SmallPojo> data = new ArrayList<>();
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromPrivateFieldsPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PrivateSmallPojo> data = new ArrayList<>();
+ data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<SmallPojo> data = new ArrayList<>();
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+ List<SmallPojo2> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToPrivateFieldPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PrivateSmallPojo> data = new ArrayList<>();
+ data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
+ List<PrivateSmallPojo2> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsWithPojoAndGenericTypes() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PojoWithGeneric> data = new ArrayList<>();
+ data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
+ HashMap<String, String> hm1 = new HashMap<>();
+ hm1.put("test1", "test1");
+ data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
+ HashMap<String, String> hm2 = new HashMap<>();
+ hm2.put("abc", "cde");
+ data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "name AS a, " +
+ "age AS b, " +
+ "generic AS c, " +
+ "generic2 AS d")
+ .select("a, b, c, c as c2, d")
+ .select("a, b, c, c === c2, d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Peter,28,{},true,[]\n" +
+ "Anna,56,{test1=test1},true,[]\n" +
+ "Lucy,42,{abc=cde},true,[]\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithToFewFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. Not enough field names specified.
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithToManyFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. Too many field names specified.
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithAmbiguousFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. Specified field names are not unique.
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithNonFieldReference1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. as() does only allow field name expressions
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithNonFieldReference2() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. as() does only allow field name expressions
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
+ }
+
+ @Test(expected = TableException.class)
+ public void testNonStaticClassInput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+ }
+
+ @Test(expected = TableException.class)
+ public void testNonStaticClassOutput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+ tableEnv.toDataSet(t, MyNonStatic.class);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public class MyNonStatic {
+ public int number;
+ }
+
+ @SuppressWarnings("unused")
+ public static class SmallPojo {
+
+ public SmallPojo() { }
+
+ public SmallPojo(String name, int age, double salary, String department) {
+ this.name = name;
+ this.age = age;
+ this.salary = salary;
+ this.department = department;
+ }
+
+ public String name;
+ public int age;
+ public double salary;
+ public String department;
+ }
+
+ @SuppressWarnings("unused")
+ public static class PojoWithGeneric {
+ public String name;
+ public int age;
+ public HashMap<String, String> generic;
+ public ArrayList<String> generic2;
+
+ public PojoWithGeneric() {
+ // default constructor
+ }
+
+ public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
+ ArrayList<String> generic2) {
+ this.name = name;
+ this.age = age;
+ this.generic = generic;
+ this.generic2 = generic2;
+ }
+
+ @Override
+ public String toString() {
+ return name + "," + age + "," + generic + "," + generic2;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class PrivateSmallPojo {
+
+ public PrivateSmallPojo() { }
+
+ public PrivateSmallPojo(String name, int age, double salary, String department) {
+ this.name = name;
+ this.age = age;
+ this.salary = salary;
+ this.department = department;
+ }
+
+ private String name;
+ private int age;
+ private double salary;
+ private String department;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public double getSalary() {
+ return salary;
+ }
+
+ public void setSalary(double salary) {
+ this.salary = salary;
+ }
+
+ public String getDepartment() {
+ return department;
+ }
+
+ public void setDepartment(String department) {
+ this.department = department;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class SmallPojo2 {
+
+ public SmallPojo2() { }
+
+ public SmallPojo2(String a, int b, double c, String d) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ this.d = d;
+ }
+
+ public String a;
+ public int b;
+ public double c;
+ public String d;
+
+ @Override
+ public String toString() {
+ return a + "," + b + "," + c + "," + d;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class PrivateSmallPojo2 {
+
+ public PrivateSmallPojo2() { }
+
+ public PrivateSmallPojo2(String a, int b, double c, String d) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ this.d = d;
+ }
+
+ private String a;
+ private int b;
+ private double c;
+ private String d;
+
+ public String getA() {
+ return a;
+ }
+
+ public void setA(String a) {
+ this.a = a;
+ }
+
+ public int getB() {
+ return b;
+ }
+
+ public void setB(int b) {
+ this.b = b;
+ }
+
+ public double getC() {
+ return c;
+ }
+
+ public void setC(double c) {
+ this.c = c;
+ }
+
+ public String getD() {
+ return d;
+ }
+
+ public void setD(String d) {
+ this.d = d;
+ }
+
+ @Override
+ public String toString() {
+ return a + "," + b + "," + c + "," + d;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index 6bcac56..02f6e0b 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -17,37 +17,40 @@
*/
package org.apache.flink.api.java.batch.table;
+import java.io.Serializable;
+import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.TableEnvironment;
import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.examples.java.WordCountTable.WC;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.WordCountTable.WC;
-
-import java.util.List;
@RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
+public class AggregationsITCase extends TableProgramsTestBase {
- public AggregationsITCase(TestExecutionMode mode){
- super(mode);
+ public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
+ super(mode, configMode);
}
@Test
public void testAggregationTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
@@ -62,7 +65,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test(expected = ValidationException.class)
public void testAggregationOnNonExistingField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Table table =
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
@@ -79,7 +82,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test
public void testWorkingAggregationDataTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
env.fromElements(
@@ -100,7 +103,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test
public void testAggregationWithArithmetic() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple2<Float, String>> input =
env.fromElements(
@@ -122,7 +125,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test
public void testAggregationWithTwoCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple2<Float, String>> input =
env.fromElements(
@@ -144,7 +147,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test(expected = ValidationException.class)
public void testNonWorkingDataTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
@@ -164,7 +167,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
@Test(expected = ValidationException.class)
public void testNoNestedAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
@@ -181,10 +184,90 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
compareResultAsText(results, expected);
}
+ @Test(expected = ValidationException.class)
+ public void testGroupingOnNonExistentField() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv
+ .fromDataSet(input, "a, b, c")
+ // must fail. Field foo is not in input
+ .groupBy("foo")
+ .select("a.avg");
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testGroupingInvalidSelection() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv
+ .fromDataSet(input, "a, b, c")
+ .groupBy("a, b")
+ // must fail. Field c is not a grouping key or aggregation
+ .select("c");
+ }
+
+ @Test
+ public void testGroupedAggregate() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .groupBy("b").select("b, a.sum");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testGroupingKeyForwardIfNotUsed() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .groupBy("b").select("a.sum");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testGroupNoAggregation() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+ List<Row> results = ds.collect();
+ compareResultAsText(results, expected);
+ }
+
@Test
public void testPojoAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
@@ -208,5 +291,90 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
String expected = "Hello\n" + "Hola";
compareResultAsText(result, expected);
}
+
+ @Test
+ public void testPojoGrouping() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+ new Tuple3<>("A", 23.0, "Z"),
+ new Tuple3<>("A", 24.0, "Y"),
+ new Tuple3<>("B", 1.0, "Z"));
+
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv
+ .fromDataSet(data, "groupMe, value, name")
+ .select("groupMe, value, name")
+ .where("groupMe != 'B'");
+
+ DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+ DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+ .sortGroup("value", Order.DESCENDING)
+ .first(1);
+
+ List<MyPojo> resultList = result.collect();
+ compareResultAsText(resultList, "A,24.0,Y");
+ }
+
+ @Test
+ public void testDistinct() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table distinct = table.select("b").distinct();
+
+ DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testDistinctAfterAggregate() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
+
+ Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+ Table distinct = table.groupBy("a, e").select("e").distinct();
+
+ DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1\n" + "2\n" + "3\n";
+ compareResultAsText(results, expected);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class MyPojo implements Serializable {
+ private static final long serialVersionUID = 8741918940120107213L;
+
+ public String groupMe;
+ public double value;
+ public String name;
+
+ public MyPojo() {
+ // for serialization
+ }
+
+ public MyPojo(String groupMe, double value, String name) {
+ this.groupMe = groupMe;
+ this.value = value;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return groupMe + "," + value + "," + name;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
new file mode 100644
index 0000000..fcdf2e1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CalcITCase extends TableProgramsTestBase {
+
+ public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
+ super(mode, configMode);
+ }
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ { TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+ { TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() }
+ });
+ }
+
+ @Test
+ public void testSimpleSelectAllWithAs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+ Table result = in
+ .select("a, b, c");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSimpleSelectWithNaming() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds);
+
+ Table result = in
+ .select("f0 as a, f1 as b")
+ .select("a, b");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSimpleSelectRenameAll() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds);
+
+ Table result = in
+ .select("f0 as a, f1 as b, f2 as c")
+ .select("a, b");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testSelectInvalidField() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv.fromDataSet(ds, "a, b, c")
+ // Must fail. Field foo does not exist
+ .select("a + 1, foo + 2");
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testSelectAmbiguousFieldNames() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv.fromDataSet(ds, "a, b, c")
+ // Must fail. Field foo does not exist
+ .select("a + 1 as foo, b + 2 as foo");
+ }
+
+ @Test
+ public void testSelectStar() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+ Table result = in
+ .select("*");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAllRejectingFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter("false");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAllPassingFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter("true");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testFilterOnIntegerTupleField() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter(" a % 2 = 0 ");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+ "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testNotEquals() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter("!( a % 2 <> 0 ) ");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+ "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testDisjunctivePreds() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter("a < 2 || a > 20");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testIntegerBiggerThan128() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ Table result = table
+ .filter("a = 300 ");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "300,1,Hello\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testFilterInvalidField() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+ table
+ // Must fail. Field foo does not exist.
+ .filter("foo = 17");
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
index 9646076..333953b 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
@@ -32,7 +31,6 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.TableEnvironment;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
deleted file mode 100644
index 7f10433..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DistinctITCase extends MultipleProgramsTestBase {
-
- public DistinctITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testDistinct() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table distinct = table.select("b").distinct();
-
- DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
- List<Row> results = ds.collect();
- String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testDistinctAfterAggregate() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
- Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
- Table distinct = table.groupBy("a, e").select("e").distinct();
-
- DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
- List<Row> results = ds.collect();
- String expected = "1\n" + "2\n" + "3\n";
- compareResultAsText(results, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
deleted file mode 100644
index 7a2bedf..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends TableProgramsTestBase {
-
- public FilterITCase(TestExecutionMode mode, TableConfigMode configMode){
- super(mode, configMode);
- }
-
- @Test
- public void testAllRejectingFilter() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter("false");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAllPassingFilter() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter("true");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
- "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
- "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
- "20,6,Comment#14\n" + "21,6,Comment#15\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testFilterOnIntegerTupleField() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter(" a % 2 = 0 ");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
- "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testNotEquals() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter("!( a % 2 <> 0 ) ");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
- "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testDisjunctivePreds() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter("a < 2 || a > 20");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testIntegerBiggerThan128() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .filter("a = 300 ");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "300,1,Hello\n";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = ValidationException.class)
- public void testFilterInvalidField() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- table
- // Must fail. Field foo does not exist.
- .filter("foo = 17");
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
deleted file mode 100644
index e6b9226..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.util.HashMap;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FromDataSetITCase extends TableProgramsTestBase {
-
- public FromDataSetITCase(TestExecutionMode mode, TableConfigMode configMode){
- super(mode, configMode);
- }
-
- @Test
- public void testAsFromTuple() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- Table table = tableEnv
- .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
- .select("a, b, c");
-
- DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
- "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
- "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
- "20,6,Comment#14\n" + "21,6,Comment#15\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromAndToTuple() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- Table table = tableEnv
- .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
- .select("a, b, c");
-
- TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
-
- DataSet<?> ds = tableEnv.toDataSet(table, ti);
- List<?> results = ds.collect();
- String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
- "(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
- "(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
- "(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
- "(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
- "(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
- "(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromTupleToPojo() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
- data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
- data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
- data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data), "a, b, c, d")
- .select("a, b, c, d");
-
- DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
- List<SmallPojo2> results = ds.collect();
- String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromPojo() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<SmallPojo> data = new ArrayList<>();
- data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data),
- "department AS a, " +
- "age AS b, " +
- "salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
-
- DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- List<Row> results = ds.collect();
- String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromPrivateFieldsPojo() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<PrivateSmallPojo> data = new ArrayList<>();
- data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data),
- "department AS a, " +
- "age AS b, " +
- "salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
-
- DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- List<Row> results = ds.collect();
- String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromAndToPojo() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<SmallPojo> data = new ArrayList<>();
- data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data),
- "department AS a, " +
- "age AS b, " +
- "salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
-
- DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
- List<SmallPojo2> results = ds.collect();
- String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsFromAndToPrivateFieldPojo() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<PrivateSmallPojo> data = new ArrayList<>();
- data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data),
- "department AS a, " +
- "age AS b, " +
- "salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
-
- DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
- List<PrivateSmallPojo2> results = ds.collect();
- String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAsWithPojoAndGenericTypes() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- List<PojoWithGeneric> data = new ArrayList<>();
- data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
- HashMap<String, String> hm1 = new HashMap<>();
- hm1.put("test1", "test1");
- data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
- HashMap<String, String> hm2 = new HashMap<>();
- hm2.put("abc", "cde");
- data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
-
- Table table = tableEnv
- .fromDataSet(env.fromCollection(data),
- "name AS a, " +
- "age AS b, " +
- "generic AS c, " +
- "generic2 AS d")
- .select("a, b, c, c as c2, d")
- .select("a, b, c, c === c2, d");
-
- DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
- List<Row> results = ds.collect();
- String expected =
- "Peter,28,{},true,[]\n" +
- "Anna,56,{test1=test1},true,[]\n" +
- "Lucy,42,{abc=cde},true,[]\n";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = TableException.class)
- public void testAsWithToFewFields() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. Not enough field names specified.
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
- }
-
- @Test(expected = TableException.class)
- public void testAsWithToManyFields() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. Too many field names specified.
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
- }
-
- @Test(expected = TableException.class)
- public void testAsWithAmbiguousFields() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. Specified field names are not unique.
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
- }
-
- @Test(expected = TableException.class)
- public void testAsWithNonFieldReference1() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. as() does only allow field name expressions
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
- }
-
- @Test(expected = TableException.class)
- public void testAsWithNonFieldReference2() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. as() does only allow field name expressions
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
- }
-
- @Test(expected = TableException.class)
- public void testNonStaticClassInput() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail since class is not static
- tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
- }
-
- @Test(expected = TableException.class)
- public void testNonStaticClassOutput() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail since class is not static
- Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
- tableEnv.toDataSet(t, MyNonStatic.class);
- }
-
- // --------------------------------------------------------------------------------------------
-
- public class MyNonStatic {
- public int number;
- }
-
- @SuppressWarnings("unused")
- public static class SmallPojo {
-
- public SmallPojo() { }
-
- public SmallPojo(String name, int age, double salary, String department) {
- this.name = name;
- this.age = age;
- this.salary = salary;
- this.department = department;
- }
-
- public String name;
- public int age;
- public double salary;
- public String department;
- }
-
- @SuppressWarnings("unused")
- public static class PojoWithGeneric {
- public String name;
- public int age;
- public HashMap<String, String> generic;
- public ArrayList<String> generic2;
-
- public PojoWithGeneric() {
- // default constructor
- }
-
- public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
- ArrayList<String> generic2) {
- this.name = name;
- this.age = age;
- this.generic = generic;
- this.generic2 = generic2;
- }
-
- @Override
- public String toString() {
- return name + "," + age + "," + generic + "," + generic2;
- }
- }
-
- @SuppressWarnings("unused")
- public static class PrivateSmallPojo {
-
- public PrivateSmallPojo() { }
-
- public PrivateSmallPojo(String name, int age, double salary, String department) {
- this.name = name;
- this.age = age;
- this.salary = salary;
- this.department = department;
- }
-
- private String name;
- private int age;
- private double salary;
- private String department;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
-
- public double getSalary() {
- return salary;
- }
-
- public void setSalary(double salary) {
- this.salary = salary;
- }
-
- public String getDepartment() {
- return department;
- }
-
- public void setDepartment(String department) {
- this.department = department;
- }
- }
-
- @SuppressWarnings("unused")
- public static class SmallPojo2 {
-
- public SmallPojo2() { }
-
- public SmallPojo2(String a, int b, double c, String d) {
- this.a = a;
- this.b = b;
- this.c = c;
- this.d = d;
- }
-
- public String a;
- public int b;
- public double c;
- public String d;
-
- @Override
- public String toString() {
- return a + "," + b + "," + c + "," + d;
- }
- }
-
- @SuppressWarnings("unused")
- public static class PrivateSmallPojo2 {
-
- public PrivateSmallPojo2() { }
-
- public PrivateSmallPojo2(String a, int b, double c, String d) {
- this.a = a;
- this.b = b;
- this.c = c;
- this.d = d;
- }
-
- private String a;
- private int b;
- private double c;
- private String d;
-
- public String getA() {
- return a;
- }
-
- public void setA(String a) {
- this.a = a;
- }
-
- public int getB() {
- return b;
- }
-
- public void setB(int b) {
- this.b = b;
- }
-
- public double getC() {
- return c;
- }
-
- public void setC(double c) {
- this.c = c;
- }
-
- public String getD() {
- return d;
- }
-
- public void setD(String d) {
- this.d = d;
- }
-
- @Override
- public String toString() {
- return a + "," + b + "," + c + "," + d;
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
deleted file mode 100644
index 1906040..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
-
- public GroupedAggregationsITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test(expected = ValidationException.class)
- public void testGroupingOnNonExistentField() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
- tableEnv
- .fromDataSet(input, "a, b, c")
- // must fail. Field foo is not in input
- .groupBy("foo")
- .select("a.avg");
- }
-
- @Test(expected = ValidationException.class)
- public void testGroupingInvalidSelection() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
- tableEnv
- .fromDataSet(input, "a, b, c")
- .groupBy("a, b")
- // must fail. Field c is not a grouping key or aggregation
- .select("c");
- }
-
- @Test
- public void testGroupedAggregate() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .groupBy("b").select("b, a.sum");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testGroupingKeyForwardIfNotUsed() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .groupBy("b").select("a.sum");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testGroupNoAggregation() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table
- .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
- List<Row> results = ds.collect();
- compareResultAsText(results, expected);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
index e6db3b0..9676608 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -18,29 +18,28 @@
package org.apache.flink.api.java.batch.table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
+import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.TableEnvironment;
import org.apache.flink.api.table.ValidationException;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.List;
-
@RunWith(Parameterized.class)
-public class JoinITCase extends MultipleProgramsTestBase {
+public class JoinITCase extends TableProgramsTestBase {
- public JoinITCase(TestExecutionMode mode) {
- super(mode);
+ public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
+ super(mode, configMode);
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
deleted file mode 100644
index ba564bf..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PojoGroupingITCase extends MultipleProgramsTestBase {
-
- public PojoGroupingITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Test
- public void testPojoGrouping() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<String, Double, String>> data = env.fromElements(
- new Tuple3<>("A", 23.0, "Z"),
- new Tuple3<>("A", 24.0, "Y"),
- new Tuple3<>("B", 1.0, "Z"));
-
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- Table table = tableEnv
- .fromDataSet(data, "groupMe, value, name")
- .select("groupMe, value, name")
- .where("groupMe != 'B'");
-
- DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
-
- DataSet<MyPojo> result = myPojos.groupBy("groupMe")
- .sortGroup("value", Order.DESCENDING)
- .first(1);
-
- List<MyPojo> resultList = result.collect();
- compareResultAsText(resultList, "A,24.0,Y");
- }
-
- public static class MyPojo implements Serializable {
- private static final long serialVersionUID = 8741918940120107213L;
-
- public String groupMe;
- public double value;
- public String name;
-
- public MyPojo() {
- // for serialization
- }
-
- public MyPojo(String groupMe, double value, String name) {
- this.groupMe = groupMe;
- this.value = value;
- this.name = name;
- }
-
- @Override
- public String toString() {
- return groupMe + "," + value + "," + name;
- }
- }
-}