You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:50 UTC
[41/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
new file mode 100644
index 0000000..8bfb61b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class SingleRowJoinTest extends TableTestBase {
+
+ @Test
+ def testSingleRowCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Int)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, asum " +
+ "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
+
+ val expected =
+ binaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null, null)),
+ term("values", "a1", "a2")
+ ),
+ term("union","a1","a2")
+ ),
+ term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
+ ),
+ term("select", "+($f0, $f1) AS asum")
+ ),
+ term("where", "true"),
+ term("join", "a1", "asum"),
+ term("joinType", "NestedLoopInnerJoin")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 = cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union","a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "=(CAST(a1), cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowNotEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 < cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union", "a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "<(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowJoinWithComplexPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long)]("A", 'a1, 'a2)
+ util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+ val query =
+ "SELECT a1, a2, b1, b2 " +
+ "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+ "WHERE a1 < b1 AND a2 = b2"
+
+ val expected = binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ batchTableNode(1),
+ tuples(List(null, null)),
+ term("values", "b1", "b2")
+ ),
+ term("union","b1","b2")
+ ),
+ term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+ ),
+ term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+ term("join", "a1", "a2", "b1", "b2"),
+ term("joinType", "NestedLoopInnerJoin")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int)]("A", 'a1, 'a2)
+ util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+ val queryLeftJoin =
+ "SELECT a2 " +
+ "FROM A " +
+ " LEFT JOIN " +
+ "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+ " ON a1 = cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ term("where", "=(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopLeftJoin")
+ ),
+ term("select", "a2")
+ ) + "\n" +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ )
+
+ util.verifySql(queryLeftJoin, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int)]("A", 'a1, 'a2)
+ util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+ val queryLeftJoin =
+ "SELECT a2 " +
+ "FROM A " +
+ " LEFT JOIN " +
+ "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+ " ON a1 > cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ term("where", ">(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopLeftJoin")
+ ),
+ term("select", "a2")
+ ) + "\n" +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ )
+
+ util.verifySql(queryLeftJoin, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Long)]("A", 'a1, 'a2)
+ util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+ val queryRightJoin =
+ "SELECT a1 " +
+ "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+ " RIGHT JOIN " +
+ "A " +
+ " ON cnt = a2"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ "",
+ term("where", "=(cnt, a2)"),
+ term("join", "cnt", "a1", "a2"),
+ term("joinType", "NestedLoopRightJoin")
+ ),
+ term("select", "a1")
+ ) + unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ ) + "\n" +
+ batchTableNode(0)
+
+ util.verifySql(queryRightJoin, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Long)]("A", 'a1, 'a2)
+ util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+ val queryRightJoin =
+ "SELECT a1 " +
+ "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+ " RIGHT JOIN " +
+ "A " +
+ " ON cnt < a2"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ "",
+ term("where", "<(cnt, a2)"),
+ term("join", "cnt", "a1", "a2"),
+ term("joinType", "NestedLoopRightJoin")
+ ),
+ term("select", "a1")
+ ) +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ ) + "\n" +
+ batchTableNode(0)
+
+ util.verifySql(queryRightJoin, expected)
+ }
+
+ @Test
+ def testSingleRowJoinInnerJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Int)]("A", 'a1, 'a2)
+ val query =
+ "SELECT a2, sum(a1) " +
+ "FROM A " +
+ "GROUP BY a2 " +
+ "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "a2"),
+ term("select", "a2", "SUM(a1) AS EXPR$1")
+ ),
+ term("where", ">(EXPR$1, EXPR$0)"),
+ term("join", "a2", "EXPR$1", "EXPR$0"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a2", "EXPR$1")
+ ) + "\n" +
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)), term("values", "a1")
+ ),
+ term("union", "a1")
+ ),
+ term("select", "SUM(a1) AS $f0")
+ ),
+ term("select", "*($f0, 0.1) AS EXPR$0")
+ )
+
+ util.verifySql(query, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..9aada9a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidFields(): Unit = {
+
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, foo FROM MyTable"
+
+ util.tableEnv.sql(sqlQuery)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
new file mode 100644
index 0000000..4272170
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testHopWindowNoOffset(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+ "FROM T " +
+ "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ util.verifySql(sqlQuery, "n/a")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testSessionWindowNoOffset(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+ "FROM T " +
+ "GROUP BY SESSION(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ util.verifySql(sqlQuery, "n/a")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testVariableWindowSize() = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val sql = "SELECT COUNT(*) " +
+ "FROM T " +
+ "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)"
+ util.verifySql(sql, "n/a")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testTumbleWindowWithInvalidUdAggArgs() = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val weightedAvg = new WeightedAvgWithMerge
+ util.tableEnv.registerFunction("weightedAvg", weightedAvg)
+
+ val sql = "SELECT weightedAvg(c, a) AS wAvg " +
+ "FROM T " +
+ "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
+ util.verifySql(sql, "n/a")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..90bcfec
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinNonExistingKey(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
+
+ util.tableEnv.sql(sqlQuery)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNonMatchingKeyTypes(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinWithAmbiguousFields(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNoEqualityPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, String)]("Table4", 'a1, 'b1, 'c1)
+
+ val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRightOuterJoinWithLocalPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLeftOuterJoinWithLocalPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testFullOuterJoinWithLocalPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+ util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..7e72a21
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testInvalidOverAggregation(): Unit = {
+
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+
+ util.addFunction("overAgg", new OverAgg0)
+
+ val sqlQuery = "SELECT overAgg(b, a) FROM T"
+ util.tableEnv.sql(sqlQuery)
+ }
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testInvalidOverAggregation2(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+ util.addFunction("overAgg", new OverAgg0)
+
+ val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
+
+ util.tableEnv.sql(sqlQuery)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
new file mode 100644
index 0000000..d3f9b9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class SortValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testLimitWithoutOrder(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+ util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
new file mode 100644
index 0000000..af001be
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+ * Test for testing aggregate plans.
+ */
+class AggregateTest extends TableTestBase {
+
+ @Test
+ def testGroupAggregateWithFilter(): Unit = {
+
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.groupBy('a)
+ .select('a, 'a.avg, 'b.sum, 'c.count)
+ .where('a === 1)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ calcNode,
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable,expected)
+ }
+
+ @Test
+ def testAggregate(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+ val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testAggregateWithFilter(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+ .select('a.avg,'b.sum,'c.count)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ // ReduceExpressionsRule will add cast for Project node by force
+ // if the input of the Project node has constant expression.
+ term("select", "CAST(1) AS a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
new file mode 100644
index 0000000..ee05547
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Upper
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
+
+ @Test
+ def testMultipleFlatteningsTable(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "c",
+ "b._1 AS b$_1",
+ "b._2 AS b$_2"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testNestedFlattening(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+ val result = table.select('a.flatten(), 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "b"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testScalarFunctionAccess(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+ val result = table.select(
+ giveMeCaseClass().get("my"),
+ giveMeCaseClass().get("clazz"),
+ giveMeCaseClass().flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
+ s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
+ s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
+ s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // Tests for all the situations when we can do fields projection. Like selecting few fields
+ // from a large field count source.
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testSimpleSelect(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.select('a, 'b)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectAllFields(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable1 = sourceTable.select('*)
+ val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
+
+ val expected = batchTableNode(0)
+
+ util.verifyTable(resultTable1, expected)
+ util.verifyTable(resultTable2, expected)
+ }
+
+ @Test
+ def testSelectAggregation(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.select('a.sum, 'b.max)
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ binaryNode(
+ "DataSetUnion",
+ values(
+ "DataSetValues",
+ tuples(List(null, null)),
+ term("values", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("union", "a", "b")
+ ),
+ term("select", "SUM(a) AS TMP_0", "MAX(b) AS TMP_1")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFunction(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+
+ util.tableEnv.registerFunction("hashCode", MyHashCode)
+
+ val resultTable = sourceTable.select("hashCode(c), b")
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFromGroupedTable(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.groupBy('a, 'c).select('a)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetDistinct",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "c")
+ ),
+ term("distinct", "a", "c")
+ ),
+ term("select", "a")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectAllFieldsFromGroupedTable(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
+
+ val expected = unaryNode(
+ "DataSetDistinct",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "c")
+ ),
+ term("distinct", "a", "c")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectAggregationFromGroupedTable(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.groupBy('c).select('a.sum)
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "c")
+ ),
+ term("groupBy", "c"),
+ term("select", "c", "SUM(a) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "c", "UPPER(c) AS k")
+ ),
+ term("groupBy", "k"),
+ term("select", "k", "SUM(a) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFromGroupedTableWithFunctionKey(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
+ ),
+ term("groupBy", "k"),
+ term("select", "k", "SUM(a) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFromAggregatedPojoTable(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
+ val resultTable = sourceTable
+ .groupBy('word)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "word"),
+ term("select", "word", "SUM(frequency) AS TMP_0")
+ ),
+ term("select", "word, TMP_0 AS frequency"),
+ term("where", "=(TMP_0, 2)")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+}
+
+object CalcTest {
+
+ case class TestCaseClass(my: String, clazz: Int)
+
+ object giveMeCaseClass extends ScalarFunction {
+ def eval(): TestCaseClass = {
+ TestCaseClass("hello", 42)
+ }
+
+ override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+ createTypeInformation[TestCaseClass]
+ }
+ }
+
+ object MyHashCode extends ScalarFunction {
+ def eval(s: String): Int = s.hashCode()
+ }
+
+ case class WC(word: String, frequency: Long)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
new file mode 100644
index 0000000..63ce267
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
+import org.junit.Test
+
+class CorrelateTest extends TableTestBase {
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result1 = table.join(function('c) as 's).select('c, 's)
+
+ val expected1 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"${function.functionIdentifier}($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result1, expected1)
+
+ // test overloading
+
+ val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+ val expected2 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result2, expected2)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"${function.functionIdentifier}($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
new file mode 100644
index 0000000..e441203
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.WindowReference
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowTest extends TableTestBase {
+
+ //===============================================================================================
+ // Common test
+ //===============================================================================================
+
+ @Test
+ def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeTumblingGroupWindowOverTimeWithUdAgg(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, myWeightedAvg('long, 'int))
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "int", "long")
+ ),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows on 'long as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "int", "long")
+ ),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ //===============================================================================================
+ // Sliding Windows
+ //===============================================================================================
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window",
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window",
+ SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeWithUdAgg(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, myWeightedAvg('long, 'int))
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window",
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'long as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "int", "long")
+ ),
+ term("window",
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'long as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "int", "long")
+ ),
+ term("window",
+ SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ //===============================================================================================
+ // Session Windows
+ //===============================================================================================
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Session withGap 7.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTimeWithUdAgg(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ val windowedTable = table
+ .window(Session withGap 7.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, myWeightedAvg('long, 'int))
+
+ val expected = unaryNode(
+ "DataSetWindowAggregate",
+ batchTableNode(0),
+ term("groupBy", "string"),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+ term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
new file mode 100644
index 0000000..e148b47
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class AggregateStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testAggregationTypes(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
+ val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testWorkingAggregationDataTypes(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
+
+ val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+ val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testProjection(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short)]("Table2")
+
+ val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+ val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAggregationWithArithmetic(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2")
+
+ val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
+ val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAggregationWithTwoCount(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2")
+
+ val t1 = t.select('_1.count, '_2.count)
+ val t2 = t.select("_1.count, _2.count")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAggregationAfterProjection(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
+
+ val t1 = t.select('_1, '_2, '_3)
+ .select('_1.avg, '_2.sum, '_3.count)
+
+ val t2 = t.select("_1, _2, _3")
+ .select("_1.avg, _2.sum, _3.count")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testDistinct(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val distinct = ds.select('b).distinct()
+ val distinct2 = ds.select("b").distinct()
+
+ verifyTableEquals(distinct, distinct2)
+ }
+
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+ val distinct = ds.groupBy('a, 'e).select('e).distinct()
+ val distinct2 = ds.groupBy("a, e").select("e").distinct()
+
+ verifyTableEquals(distinct, distinct2)
+ }
+
+ @Test
+ def testGroupedAggregate(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t.groupBy('b).select('b, 'a.sum)
+ val t2 = t.groupBy("b").select("b, a.sum")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupingKeyForwardIfNotUsed(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t.groupBy('b).select('a.sum)
+ val t2 = t.groupBy("b").select("a.sum")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupNoAggregation(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t
+ .groupBy('b)
+ .select('a.sum as 'd, 'b)
+ .groupBy('b, 'd)
+ .select('b)
+
+ val t2 = t
+ .groupBy("b")
+ .select("a.sum as d, b")
+ .groupBy("b, d")
+ .select("b")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant1(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t.select('a, 4 as 'four, 'b)
+ .groupBy('four, 'a)
+ .select('four, 'b.sum)
+
+ val t2 = t.select("a, 4 as four, b")
+ .groupBy("four, a")
+ .select("four, b.sum")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant2(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t.select('b, 4 as 'four, 'a)
+ .groupBy('b, 'four)
+ .select('four, 'a.sum)
+ val t2 = t.select("b, 4 as four, a")
+ .groupBy("b, four")
+ .select("four, a.sum")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupedAggregateWithExpression(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+ val t1 = t.groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+ val t2 = t.groupBy("e, b % 3")
+ .select("c.min, e, a.avg, d.count")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupedAggregateWithFilter(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val t1 = t.groupBy('b)
+ .select('b, 'a.sum)
+ .where('b === 2)
+ val t2 = t.groupBy("b")
+ .select("b, a.sum")
+ .where("b = 2")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAnalyticAggregation(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, Float, Double)]('_1, '_2, '_3, '_4)
+
+ val resScala = t.select(
+ '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop,
+ '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp,
+ '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop,
+ '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp)
+ val resJava = t.select("""
+ _1.stddevPop, _2.stddevPop, _3.stddevPop, _4.stddevPop,
+ _1.stddevSamp, _2.stddevSamp, _3.stddevSamp, _4.stddevSamp,
+ _1.varPop, _2.varPop, _3.varPop, _4.varPop,
+ _1.varSamp, _2.varSamp, _3.varSamp, _4.varSamp""")
+
+ verifyTableEquals(resScala, resJava)
+ }
+
+ @Test
+ def testAggregateWithUDAGG(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ val myCnt = new CountAggFunction
+ util.tableEnv.registerFunction("myCnt", myCnt)
+ val myWeightedAvg = new WeightedAvgWithMergeAndReset
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+ val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
+ val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testGroupedAggregateWithUDAGG(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+
+ val myCnt = new CountAggFunction
+ util.tableEnv.registerFunction("myCnt", myCnt)
+ val myWeightedAvg = new WeightedAvgWithMergeAndReset
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+ val t1 = t.groupBy('b)
+ .select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))
+ val t2 = t.groupBy("b")
+ .select("b, myCnt(a) + 9 as aCnt, myWeightedAvg(b, a) * 2 as wAvg, myWeightedAvg(a, a)")
+
+ verifyTableEquals(t1, t2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..901b2f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class CalcStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testSimpleSelectAllWithAs(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = t.select('a, 'b, 'c)
+ val t2 = t.select("a, b, c")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testSimpleSelectWithNaming(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t
+ .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+ .select('a, 'b)
+
+ val t2 = t
+ .select("_1 as a, _2 as b, _1 as c")
+ .select("a, b")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testSimpleSelectRenameAll(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t
+ .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+ .select('a, 'b)
+
+ val t2 = t
+ .select("_1 as a, _2 as b, _3 as c")
+ .select("a, b")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testSelectStar(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = t.select('*)
+ val t2 = t.select("*")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( Literal(false) )
+ val t2 = ds.filter("faLsE")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( Literal(true) )
+ val t2 = ds.filter("trUe")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testFilterOnStringTupleField(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( 'c.like("%world%") )
+ val t2 = ds.filter("c.like('%world%')")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testFilterOnIntegerTupleField(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( 'a % 2 === 0 )
+ val t2 = ds.filter( "a % 2 = 0 ")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testNotEquals(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( 'a % 2 !== 0 )
+ val t2 = ds.filter("a % 2 <> 0")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter( 'a < 2 || 'a > 20)
+ val t2 = ds.filter("a < 2 || a > 20")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testConsecutiveFilters(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+ val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testFilterBasicType(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[String]("Table3",'a)
+
+ val t1 = ds.filter( 'a.like("H%") )
+ val t2 = ds.filter( "a.like('H%')" )
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testFilterOnCustomType(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
+
+ val t1 = t.filter( 's.like("%a%") )
+ val t2 = t.filter("s.like('%a%')")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testSimpleCalc(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t.select('_1, '_2, '_3)
+ .where('_1 < 7)
+ .select('_1, '_3)
+
+ val t2 = t.select("_1, _2, _3")
+ .where("_1 < 7")
+ .select("_1, _3")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testCalcWithTwoFilters(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t.select('_1, '_2, '_3)
+ .where('_1 < 7 && '_2 === 3)
+ .select('_1, '_3)
+ .where('_1 === 4)
+ .select('_1)
+
+ val t2 = t.select("_1, _2, _3")
+ .where("_1 < 7 && _2 = 3")
+ .select("_1, _3")
+ .where("_1 === 4")
+ .select("_1")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testCalcWithAggregation(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
+
+ val t1 = t.select('_1, '_2, '_3)
+ .where('_1 < 15)
+ .groupBy('_2)
+ .select('_1.min, '_2.count as 'cnt)
+ .where('cnt > 3)
+
+
+ val t2 = t.select("_1, _2, _3")
+ .where("_1 < 15")
+ .groupBy("_2")
+ .select("_1.min, _2.count as cnt")
+ .where("cnt > 3")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testCalcJoin(): Unit = {
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+ val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
+ .where('b > 1).select('a, 'd).where('d === 2)
+ val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
+ .where("b > 1").select("a, d").where("d = 2")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testAdvancedDataTypes(): Unit = {
+ val util = batchTestUtil()
+ val t = util
+ .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+ val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+ "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
+ "1984-07-12 14:34:24".cast(Types.SQL_TIMESTAMP))
+ val t2 = t.select("a, b, c, d, e, 11.2p, 11.2p," +
+ "'1984-07-12'.toDate, '14:34:24'.toTime," +
+ "'1984-07-12 14:34:24'.toTimestamp")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testIntegerBiggerThan128(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+ val t1 = t.filter('a === 300)
+ val t2 = t.filter("a = 300")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ def testNumericAutoCastInArithmetic() {
+ val util = batchTestUtil()
+ val table = util.addTable[(Byte, Short, Int, Long, Float, Double, Long, Double)](
+ "Table",
+ '_1, '_2, '_3, '_4, '_5, '_6, '_7, '_8)
+
+ val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+ '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+ val t2 = table.select("_1 + 1, _2 +" +
+ " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ @throws[Exception]
+ def testNumericAutoCastInComparison() {
+ val util = batchTestUtil()
+ val table = util.addTable[(Byte, Short, Int, Long, Float, Double)](
+ "Table",
+ 'a, 'b, 'c, 'd, 'e, 'f)
+
+ val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
+ 'd > 1.0f && 'e > 1.0d && 'f > 1)
+ val t2 = table
+ .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ @throws[Exception]
+ def testCasting() {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Double, Long, Boolean)](
+ "Table",
+ '_1, '_2, '_3, '_4)
+
+ val t1 = table .select(
+ // * -> String
+ '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+ // NUMERIC TYPE -> Boolean
+ '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+ // NUMERIC TYPE -> NUMERIC TYPE
+ '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+ // Boolean -> NUMERIC TYPE
+ '_4.cast(DOUBLE), // identity casting
+ '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+ val t2 = table.select(
+ // * -> String
+ "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
+ // NUMERIC TYPE -> Boolean
+ "_1.cast(BOOLEAN), _2.cast(BOOLEAN), _3.cast(BOOLEAN)," +
+ // NUMERIC TYPE -> NUMERIC TYPE
+ "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
+ // Boolean -> NUMERIC TYPE
+ "_4.cast(DOUBLE)," +
+ // identity casting
+ "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOLEAN)")
+
+ verifyTableEquals(t1, t2)
+ }
+
+ @Test
+ @throws[Exception]
+ def testCastFromString() {
+ val util = batchTestUtil()
+ val table = util.addTable[(String, String, String)](
+ "Table",
+ '_1, '_2, '_3)
+
+ val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+ '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+ val t2 = table.select(
+ "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
+ "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOLEAN)")
+
+ verifyTableEquals(t1, t2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
new file mode 100644
index 0000000..6ef46af
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.utils._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class CorrelateStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testCorrelateJoins(): Unit = {
+ val util = batchTestUtil()
+
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
+ val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
+
+ // test cross join
+ val func1 = new TableFunc1
+ util.javaTableEnv.registerFunction("func1", func1)
+ var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+ var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test left outer join
+ scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test overloading
+ scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test custom result type
+ val func2 = new TableFunc2
+ util.javaTableEnv.registerFunction("func2", func2)
+ scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test hierarchy generic type
+ val hierarchy = new HierarchyTableFunction
+ util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+ .select("c, name, len, adult")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test pojo type
+ val pojo = new PojoTableFunc
+ util.javaTableEnv.registerFunction("pojo", pojo)
+ scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with filter
+ scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+ .select("c, name, len").filter("len > 2")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with scalar function
+ scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+ verifyTableEquals(scalaTable, javaTable)
+ }
+}