You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/29 13:47:49 UTC
[2/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
deleted file mode 100644
index 581c8ed..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class SelectITCase extends TableProgramsTestBase {
-
- public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) {
- super(mode, configMode);
- }
-
- @Test
- public void testSimpleSelectAllWithAs() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
- Table result = in
- .select("a, b, c");
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
- "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
- "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
- "20,6,Comment#14\n" + "21,6,Comment#15\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testSimpleSelectWithNaming() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- Table in = tableEnv.fromDataSet(ds);
-
- Table result = in
- .select("f0 as a, f1 as b")
- .select("a, b");
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
- "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
- "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testSimpleSelectRenameAll() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- Table in = tableEnv.fromDataSet(ds);
-
- Table result = in
- .select("f0 as a, f1 as b, f2 as c")
- .select("a, b");
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
- "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
- "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = ValidationException.class)
- public void testSelectInvalidField() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
- tableEnv.fromDataSet(ds, "a, b, c")
- // Must fail. Field foo does not exist
- .select("a + 1, foo + 2");
- }
-
- @Test(expected = ValidationException.class)
- public void testSelectAmbiguousFieldNames() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
- tableEnv.fromDataSet(ds, "a, b, c")
- // Must fail. Field foo does not exist
- .select("a + 1 as foo, b + 2 as foo");
- }
-
- @Test
- public void testSelectStar() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
- Table result = in
- .select("*");
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
- "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
- "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
- "20,6,Comment#14\n" + "21,6,Comment#15\n";
- compareResultAsText(results, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
index c33e1ef..2d82dbc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
@@ -18,6 +18,8 @@
package org.apache.flink.api.scala.batch
+import java.util
+
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
@@ -140,4 +142,131 @@ class TableEnvironmentITCase(
// Must fail. Table is bound to different TableEnvironment.
tEnv2.registerTable("MyTable", t1)
}
+
+ @Test
+ def testToTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "Peter,28,4000.0,Sales\n" +
+ "Anna,56,10000.0,Engineering\n" +
+ "Lucy,42,6000.0,HR\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromAndToCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+ "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+ "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+ val results = t.toDataSet[SomeCaseClass].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithToFewFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithToManyFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithAmbiguousFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Field names not unique.
+ .toTable(tEnv, 'a, 'b, 'b)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithNonFieldReference1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a + 1, 'b, 'c)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithNonFieldReference2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a as 'foo, 'b, 'c)
+ }
+}
+
+object TableEnvironmentITCase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
+ }
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+ def this() { this("", 0, 0.0, "") }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
index 407fa4c..d7e99d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
@@ -20,14 +20,15 @@ package org.apache.flink.api.scala.batch
import java.io.File
-import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.table.sinks.CsvTableSink
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -35,8 +36,9 @@ import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class TableSinkITCase(
- mode: TestExecutionMode)
- extends MultipleProgramsTestBase(mode) {
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
@Test
def testBatchTableSink(): Unit = {
@@ -46,7 +48,7 @@ class TableSinkITCase(
val path = tmpFile.toURI.toString
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
env.setParallelism(4)
val input = CollectionDataSets.get3TupleDataSet(env)
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
index 6fd0d13..08bee72 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
@@ -39,14 +39,16 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
-class TableSourceITCase(mode: TestExecutionMode)
- extends MultipleProgramsTestBase(mode) {
+class TableSourceITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
@Test
def testBatchTableSourceTableAPI(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
val results = tEnv
@@ -65,7 +67,7 @@ class TableSourceITCase(mode: TestExecutionMode)
def testBatchTableSourceSQL(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
val results = tEnv.sql(
@@ -100,7 +102,7 @@ class TableSourceITCase(mode: TestExecutionMode)
tmpWriter.close()
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val csvTable = new CsvTableSource(
tempFile.getAbsolutePath,
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
new file mode 100644
index 0000000..49a97e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class CalcITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSelectStarFromTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectStarFromDataSet(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithNaming(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, foo FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnString(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnInteger(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterWithAnd(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+ "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
+
+object CalcITCase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
deleted file mode 100644
index cc4da38..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testAllRejectingFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE false"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE true"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnString(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnInteger(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
- "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
- "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDisjunctivePredicate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterWithAnd(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
- "9,4,Comment#3\n" + "17,6,Comment#11\n" +
- "19,6,Comment#13\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
deleted file mode 100644
index 07b802d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testSelectStarFromTable(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSelectStarFromDataSet(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSimpleSelectAll(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT a, b, c FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSelectWithNaming(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
- "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
- "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidFields(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT a, foo FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- tEnv.sql(sqlQuery)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index 7c0cdff..16c8ece 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -19,12 +19,14 @@
package org.apache.flink.api.scala.batch.table
import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -32,13 +34,16 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+class AggregationsITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
@Test
def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
@@ -52,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationOnNonExistingField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
// Must fail. Field 'foo does not exist.
@@ -63,7 +68,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -79,7 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements(
(1: Byte, 1: Short),
@@ -95,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationWithArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
.select(('_1 + 2).avg + 2, '_2.count + 5)
@@ -109,7 +114,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationWithTwoCount(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
.select('_1.count, '_2.count)
@@ -123,7 +128,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationAfterProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -140,7 +145,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testNonWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements(("Hello", 1)).toTable(tEnv)
// Must fail. Field '_1 is not a numeric type.
@@ -153,7 +158,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testNoNestedAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements(("Hello", 1)).toTable(tEnv)
// Must fail. Sum aggregation can not be chained.
@@ -164,7 +169,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testSQLStyleAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.select(
@@ -184,7 +189,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testPojoAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val input = env.fromElements(
MyWC("hello", 1),
@@ -204,5 +209,196 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
}
+ @Test
+ def testDistinct(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val distinct = ds.select('b).distinct()
+
+ val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ val results = distinct.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ val distinct = ds.groupBy('a, 'e).select('e).distinct()
+
+ val expected = "1\n" + "2\n" + "3\n"
+ val results = distinct.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnNonExistentField(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ // must fail. '_foo not a valid field
+ .groupBy('_foo)
+ .select('a.avg)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingInvalidSelection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('a, 'b)
+ // must fail. 'c is not a grouping key or aggregation
+ .select('c)
+ }
+
+ @Test
+ def testGroupedAggregate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+
+ val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum)
+
+ val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupNoAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum as 'd, 'b)
+ .groupBy('b, 'd)
+ .select('b)
+
+ val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithLongKeys(): Unit = {
+ // This uses very long keys to force serialized comparison.
+ // With short keys, the normalized key is sufficient.
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = env.fromElements(
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
+ .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('a, 'b)
+ .select('c.sum)
+
+ val expected = "10\n" + "8\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant1(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 4 as 'four, 'b)
+ .groupBy('four, 'a)
+ .select('four, 'b.sum)
+
+ val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
+ "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
+ "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
+ val results = t.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant2(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('b, 4 as 'four, 'a)
+ .groupBy('b, 'four)
+ .select('four, 'a.sum)
+
+ val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithExpression(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+
+ val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
+ "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+ .where('b === 2)
+
+ val expected = "2,5\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
index d64e414..4ffda87 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
@@ -18,14 +18,18 @@
package org.apache.flink.api.scala.batch.table
+import java.util
+
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -39,6 +43,299 @@ class CalcITCase(
extends TableProgramsTestBase(mode, configMode) {
@Test
+ def testSimpleSelectAll(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectAllWithAs(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectWithNaming(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+ .select('a, 'b)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectRenameAll(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+ .select('a, 'b)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSelectInvalidFieldFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ // must fail. Field 'foo does not exist
+ .select('a, 'foo)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSelectAmbiguousRenaming(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ // must fail. 'a and 'b are both renamed to 'foo
+ .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSelectAmbiguousRenaming2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ // must fail. 'a and 'b are both renamed to 'a
+ .select('a, 'b as 'a).toDataSet[Row].print()
+ }
+
+ @Test
+ def testSelectStar(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAliasStarException(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ try {
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+ fail("ValidationException expected")
+ } catch {
+ case _: ValidationException => //ignore
+ }
+
+ try {
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+ fail("ValidationException expected")
+ } catch {
+ case _: ValidationException => //ignore
+ }
+
+ try {
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+ fail("ValidationException expected")
+ } catch {
+ case _: ValidationException => //ignore
+ }
+
+ try {
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+ fail("ValidationException expected")
+ } catch {
+ case _: ValidationException => //ignore
+ }
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(false) )
+
+ val expected = "\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(true) )
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnStringTupleField(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val filterDs = ds.filter( 'c.like("%world%") )
+
+ val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnIntegerTupleField(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 === 0 )
+
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testNotEquals(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 !== 0)
+ val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
+ "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
+ "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
+ "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a < 2 || 'a > 20)
+ val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testConsecutiveFilters(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+ val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+ "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = filterDs.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterBasicType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.getStringDataSet(env)
+
+ val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnCustomType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+ .filter( 's.like("%a%") )
+
+ val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+ val results = filterDs.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFilterInvalidFieldName(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+ // must fail. Field 'foo does not exist
+ ds.filter( 'foo === 2 )
+ }
+
+ @Test
def testSimpleCalc(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -104,3 +401,13 @@ class CalcITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
+
+object CalcITCase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
deleted file mode 100644
index 55c7944..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
- @Test
- def testDistinct(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val distinct = ds.select('b).distinct()
-
- val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
- val results = distinct.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDistinctAfterAggregate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- val distinct = ds.groupBy('a, 'e).select('e).distinct()
-
- val expected = "1\n" + "2\n" + "3\n"
- val results = distinct.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
deleted file mode 100644
index ee0356f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testAllRejectingFilter(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(false) )
-
- val expected = "\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(true) )
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnStringTupleField(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val filterDs = ds.filter( 'c.like("%world%") )
-
- val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnIntegerTupleField(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 === 0 )
-
- val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
- "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
- "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testNotEquals(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 !== 0)
- val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
- "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
- "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
- "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
- val results = filterDs.collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDisjunctivePredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a < 2 || 'a > 20)
- val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
- val results = filterDs.collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testConsecutiveFilters(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
- val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
- "9,4,Comment#3\n" + "17,6,Comment#11\n" +
- "19,6,Comment#13\n" + "21,6,Comment#15\n"
- val results = filterDs.collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterBasicType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.getStringDataSet(env)
-
- val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
-
- val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnCustomType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
- .filter( 's.like("%a%") )
-
- val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
- val results = filterDs.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFilterInvalidFieldName(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- // must fail. Field 'foo does not exist
- ds.filter( 'foo === 2 )
- }
-
-}