You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:50 UTC

[41/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
new file mode 100644
index 0000000..8bfb61b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SingleRowJoinTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class SingleRowJoinTest extends TableTestBase {
+
+  @Test
+  def testSingleRowCrossJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Int)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, asum " +
+      "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
+
+    val expected =
+      binaryNode(
+        "DataSetSingleRowJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a1")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                batchTableNode(0),
+                tuples(List(null, null)),
+                term("values", "a1", "a2")
+              ),
+              term("union","a1","a2")
+            ),
+            term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
+          ),
+          term("select", "+($f0, $f1) AS asum")
+        ),
+        term("where", "true"),
+        term("join", "a1", "asum"),
+        term("joinType", "NestedLoopInnerJoin")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 = cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
+              ),
+              term("union","a1")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "=(CAST(a1), cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a1", "a2")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowNotEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 < cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
+              ),
+              term("union", "a1")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "<(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a1", "a2")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowJoinWithComplexPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long)]("A", 'a1, 'a2)
+    util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+    val query =
+      "SELECT a1, a2, b1, b2 " +
+        "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+        "WHERE a1 < b1 AND a2 = b2"
+
+    val expected = binaryNode(
+      "DataSetSingleRowJoin",
+      batchTableNode(0),
+      unaryNode(
+        "DataSetAggregate",
+        unaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetValues",
+            batchTableNode(1),
+            tuples(List(null, null)),
+            term("values", "b1", "b2")
+          ),
+          term("union","b1","b2")
+        ),
+        term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+      ),
+      term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+      term("join", "a1", "a2", "b1", "b2"),
+      term("joinType", "NestedLoopInnerJoin")
+    )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int)]("A", 'a1, 'a2)
+    util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+    val queryLeftJoin =
+      "SELECT a2 " +
+        "FROM A " +
+        "  LEFT JOIN " +
+        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+        "  ON a1 = cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          term("where", "=(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopLeftJoin")
+        ),
+        term("select", "a2")
+      ) + "\n" +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        )
+
+    util.verifySql(queryLeftJoin, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int)]("A", 'a1, 'a2)
+    util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+    val queryLeftJoin =
+      "SELECT a2 " +
+        "FROM A " +
+        "  LEFT JOIN " +
+        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+        "  ON a1 > cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          term("where", ">(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopLeftJoin")
+        ),
+        term("select", "a2")
+      ) + "\n" +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        )
+
+    util.verifySql(queryLeftJoin, expected)
+  }
+
+  @Test
+  def testLeftSingleRightJoinEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Long)]("A", 'a1, 'a2)
+    util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+    val queryRightJoin =
+      "SELECT a1 " +
+        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+        "  RIGHT JOIN " +
+        "A " +
+        "  ON cnt = a2"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          "",
+          term("where", "=(cnt, a2)"),
+          term("join", "cnt", "a1", "a2"),
+          term("joinType", "NestedLoopRightJoin")
+        ),
+        term("select", "a1")
+      ) + unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        ) + "\n" +
+        batchTableNode(0)
+
+    util.verifySql(queryRightJoin, expected)
+  }
+
+  @Test
+  def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Long)]("A", 'a1, 'a2)
+    util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+    val queryRightJoin =
+      "SELECT a1 " +
+        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+        "  RIGHT JOIN " +
+        "A " +
+        "  ON cnt < a2"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          "",
+          term("where", "<(cnt, a2)"),
+          term("join", "cnt", "a1", "a2"),
+          term("joinType", "NestedLoopRightJoin")
+        ),
+        term("select", "a1")
+      ) +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        ) + "\n" +
+        batchTableNode(0)
+
+    util.verifySql(queryRightJoin, expected)
+  }
+
+  @Test
+  def testSingleRowJoinInnerJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Int)]("A", 'a1, 'a2)
+    val query =
+      "SELECT a2, sum(a1) " +
+        "FROM A " +
+        "GROUP BY a2 " +
+        "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          unaryNode(
+            "DataSetAggregate",
+            batchTableNode(0),
+            term("groupBy", "a2"),
+            term("select", "a2", "SUM(a1) AS EXPR$1")
+          ),
+          term("where", ">(EXPR$1, EXPR$0)"),
+          term("join", "a2", "EXPR$1", "EXPR$0"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a2", "EXPR$1")
+      ) + "\n" +
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)), term("values", "a1")
+              ),
+              term("union", "a1")
+            ),
+            term("select", "SUM(a1) AS $f0")
+          ),
+          term("select", "*($f0, 0.1) AS EXPR$0")
+        )
+
+    util.verifySql(query, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..9aada9a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidFields(): Unit = {
+
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, foo FROM MyTable"
+
+    util.tableEnv.sql(sqlQuery)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
new file mode 100644
index 0000000..4272170
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testHopWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testSessionWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testVariableWindowSize() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql = "SELECT COUNT(*) " +
+      "FROM T " +
+      "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)"
+    util.verifySql(sql, "n/a")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTumbleWindowWithInvalidUdAggArgs() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val weightedAvg = new WeightedAvgWithMerge
+    util.tableEnv.registerFunction("weightedAvg", weightedAvg)
+
+    val sql = "SELECT weightedAvg(c, a) AS wAvg " +
+      "FROM T " +
+      "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
+    util.verifySql(sql, "n/a")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..90bcfec
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinNonExistingKey(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
+
+    util.tableEnv.sql(sqlQuery)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoinNonMatchingKeyTypes(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithAmbiguousFields(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoinNoEqualityPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testCrossJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, String)]("Table4", 'a1, 'b1, 'c1)
+
+    val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..7e72a21
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidOverAggregation(): Unit = {
+
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+
+    util.addFunction("overAgg", new OverAgg0)
+
+    val sqlQuery = "SELECT overAgg(b, a) FROM T"
+    util.tableEnv.sql(sqlQuery)
+  }
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidOverAggregation2(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+    util.addFunction("overAgg", new OverAgg0)
+
+    val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
+
+    util.tableEnv.sql(sqlQuery)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
new file mode 100644
index 0000000..d3f9b9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class SortValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testLimitWithoutOrder(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
new file mode 100644
index 0000000..af001be
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregateTest extends TableTestBase {
+
+  @Test
+  def testGroupAggregateWithFilter(): Unit = {
+
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.groupBy('a)
+      .select('a, 'a.avg, 'b.sum, 'c.count)
+      .where('a === 1)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      calcNode,
+      term("groupBy", "a"),
+      term("select",
+        "a",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable,expected)
+  }
+
+  @Test
+  def testAggregate(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testAggregateWithFilter(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+      .select('a.avg,'b.sum,'c.count)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      // ReduceExpressionsRule will add cast for Project node by force
+      // if the input of the Project node has constant expression.
+      term("select", "CAST(1) AS a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
new file mode 100644
index 0000000..ee05547
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Upper
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
+
+  @Test
+  def testMultipleFlatteningsTable(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "c",
+        "b._1 AS b$_1",
+        "b._2 AS b$_2"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testNestedFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+    val result = table.select('a.flatten(), 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "b"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunctionAccess(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+    val result = table.select(
+      giveMeCaseClass().get("my"),
+      giveMeCaseClass().get("clazz"),
+      giveMeCaseClass().flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
+        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
+        s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
+        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Tests for all the situations when we can do fields projection. Like selecting few fields
+  // from a large field count source.
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testSimpleSelect(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.select('a, 'b)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectAllFields(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable1 = sourceTable.select('*)
+    val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
+
+    val expected = batchTableNode(0)
+
+    util.verifyTable(resultTable1, expected)
+    util.verifyTable(resultTable2, expected)
+  }
+
+  @Test
+  def testSelectAggregation(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.select('a.sum, 'b.max)
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      binaryNode(
+        "DataSetUnion",
+        values(
+          "DataSetValues",
+          tuples(List(null, null)),
+          term("values", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        term("union", "a", "b")
+      ),
+      term("select", "SUM(a) AS TMP_0", "MAX(b) AS TMP_1")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFunction(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+
+    util.tableEnv.registerFunction("hashCode", MyHashCode)
+
+    val resultTable = sourceTable.select("hashCode(c), b")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.groupBy('a, 'c).select('a)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetDistinct",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "c")
+        ),
+        term("distinct", "a", "c")
+      ),
+      term("select", "a")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectAllFieldsFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
+
+    val expected = unaryNode(
+      "DataSetDistinct",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a", "c")
+      ),
+      term("distinct", "a", "c")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectAggregationFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.groupBy('c).select('a.sum)
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "a", "c")
+          ),
+          term("groupBy", "c"),
+          term("select", "c", "SUM(a) AS TMP_0")
+        ),
+        term("select", "TMP_0")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "a", "c", "UPPER(c) AS k")
+          ),
+          term("groupBy", "k"),
+          term("select", "k", "SUM(a) AS TMP_0")
+        ),
+        term("select", "TMP_0")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromGroupedTableWithFunctionKey(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
+          ),
+          term("groupBy", "k"),
+          term("select", "k", "SUM(a) AS TMP_0")
+        ),
+        term("select", "TMP_0")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromAggregatedPojoTable(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
+    val resultTable = sourceTable
+      .groupBy('word)
+      .select('word, 'frequency.sum as 'frequency)
+      .filter('frequency === 2)
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetAggregate",
+          batchTableNode(0),
+          term("groupBy", "word"),
+          term("select", "word", "SUM(frequency) AS TMP_0")
+        ),
+        term("select", "word, TMP_0 AS frequency"),
+        term("where", "=(TMP_0, 2)")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+}
+
+object CalcTest {
+
+  case class TestCaseClass(my: String, clazz: Int)
+
+  object giveMeCaseClass extends ScalarFunction {
+    def eval(): TestCaseClass = {
+      TestCaseClass("hello", 42)
+    }
+
+    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+      createTypeInformation[TestCaseClass]
+    }
+  }
+
+  object MyHashCode extends ScalarFunction {
+    def eval(s: String): Int = s.hashCode()
+  }
+
+  case class WC(word: String, frequency: Long)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
new file mode 100644
index 0000000..63ce267
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
+import org.junit.Test
+
+class CorrelateTest extends TableTestBase {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.join(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
new file mode 100644
index 0000000..e441203
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.WindowReference
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowTest extends TableTestBase {
+
+  //===============================================================================================
+  // Common test
+  //===============================================================================================
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTimeWithUdAgg(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, myWeightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  //===============================================================================================
+  // Sliding Windows
+  //===============================================================================================
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window",
+        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window",
+        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeWithUdAgg(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, myWeightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window",
+           SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window",
+        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window",
+        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  //===============================================================================================
+  // Session Windows
+  //===============================================================================================
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTimeWithUdAgg(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, myWeightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
new file mode 100644
index 0000000..e148b47
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/AggregateStringExpressionTest.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class AggregateStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
+    val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
+
+    val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+    val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testProjection(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short)]("Table2")
+
+    val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+    val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAggregationWithArithmetic(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2")
+
+    val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
+    val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2")
+
+    val t1 = t.select('_1.count, '_2.count)
+    val t2 = t.select("_1.count, _2.count")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAggregationAfterProjection(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
+
+    val t1 = t.select('_1, '_2, '_3)
+      .select('_1.avg, '_2.sum, '_3.count)
+
+    val t2 = t.select("_1, _2, _3")
+      .select("_1.avg, _2.sum, _3.count")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testDistinct(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val distinct = ds.select('b).distinct()
+    val distinct2 = ds.select("b").distinct()
+
+    verifyTableEquals(distinct, distinct2)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+    val distinct = ds.groupBy('a, 'e).select('e).distinct()
+    val distinct2 = ds.groupBy("a, e").select("e").distinct()
+
+    verifyTableEquals(distinct, distinct2)
+  }
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('b, 'a.sum)
+    val t2 = t.groupBy("b").select("b, a.sum")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('a.sum)
+    val t2 = t.groupBy("b").select("a.sum")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupNoAggregation(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+
+    val t2 = t
+      .groupBy("b")
+      .select("a.sum as d, b")
+      .groupBy("b, d")
+      .select("b")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant1(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t.select('a, 4 as 'four, 'b)
+      .groupBy('four, 'a)
+      .select('four, 'b.sum)
+
+    val t2 = t.select("a, 4 as four, b")
+      .groupBy("four, a")
+      .select("four, b.sum")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant2(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t.select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
+    val t2 = t.select("b, 4 as four, a")
+      .groupBy("b, four")
+      .select("four, a.sum")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupedAggregateWithExpression(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
+    val t2 = t.groupBy("e, b % 3")
+      .select("c.min, e, a.avg, d.count")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupedAggregateWithFilter(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b)
+      .select('b, 'a.sum)
+      .where('b === 2)
+    val t2 = t.groupBy("b")
+      .select("b, a.sum")
+      .where("b = 2")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAnalyticAggregation(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, Float, Double)]('_1, '_2, '_3, '_4)
+
+    val resScala = t.select(
+      '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop,
+      '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp,
+      '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop,
+      '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp)
+    val resJava = t.select("""
+      _1.stddevPop, _2.stddevPop, _3.stddevPop, _4.stddevPop,
+      _1.stddevSamp, _2.stddevSamp, _3.stddevSamp, _4.stddevSamp,
+      _1.varPop, _2.varPop, _3.varPop, _4.varPop,
+      _1.varSamp, _2.varSamp, _3.varSamp, _4.varSamp""")
+
+    verifyTableEquals(resScala, resJava)
+  }
+
+  @Test
+  def testAggregateWithUDAGG(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myCnt = new CountAggFunction
+    util.tableEnv.registerFunction("myCnt", myCnt)
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+    val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
+    val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testGroupedAggregateWithUDAGG(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+
+    val myCnt = new CountAggFunction
+    util.tableEnv.registerFunction("myCnt", myCnt)
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+    val t1 = t.groupBy('b)
+      .select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))
+    val t2 = t.groupBy("b")
+      .select("b, myCnt(a) + 9 as aCnt, myWeightedAvg(b, a) * 2 as wAvg, myWeightedAvg(a, a)")
+
+    verifyTableEquals(t1, t2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..901b2f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class CalcStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = t.select('a, 'b, 'c)
+    val t2 = t.select("a, b, c")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _1 as c")
+      .select("a, b")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testSimpleSelectRenameAll(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _3 as c")
+      .select("a, b")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = t.select('*)
+    val t2 = t.select("*")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(false) )
+    val t2 = ds.filter("faLsE")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(true) )
+    val t2 = ds.filter("trUe")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testFilterOnStringTupleField(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( 'c.like("%world%") )
+    val t2 = ds.filter("c.like('%world%')")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 === 0 )
+    val t2 = ds.filter( "a % 2 = 0 ")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 !== 0 )
+    val t2 = ds.filter("a % 2 <> 0")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a < 2 || 'a > 20)
+    val t2 = ds.filter("a < 2 || a > 20")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testConsecutiveFilters(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+    val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testFilterBasicType(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[String]("Table3",'a)
+
+    val t1 = ds.filter( 'a.like("H%") )
+    val t2 = ds.filter( "a.like('H%')" )
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testFilterOnCustomType(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
+
+    val t1 = t.filter( 's.like("%a%") )
+    val t2 = t.filter("s.like('%a%')")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testSimpleCalc(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7)
+      .select('_1, '_3)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7")
+      .select("_1, _3")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testCalcWithTwoFilters(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7 && '_2 === 3)
+      .select('_1, '_3)
+      .where('_1 === 4)
+      .select('_1)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7 && _2 = 3")
+      .select("_1, _3")
+      .where("_1 === 4")
+      .select("_1")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testCalcWithAggregation(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 15)
+      .groupBy('_2)
+      .select('_1.min, '_2.count as 'cnt)
+      .where('cnt > 3)
+
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 15")
+      .groupBy("_2")
+      .select("_1.min, _2.count as cnt")
+      .where("cnt > 3")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testCalcJoin(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
+      .where('b > 1).select('a, 'd).where('d === 2)
+    val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
+      .where("b > 1").select("a, d").where("d = 2")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testAdvancedDataTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util
+      .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+        "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
+        "1984-07-12 14:34:24".cast(Types.SQL_TIMESTAMP))
+    val t2 = t.select("a, b, c, d, e, 11.2p, 11.2p," +
+      "'1984-07-12'.toDate, '14:34:24'.toTime," +
+      "'1984-07-12 14:34:24'.toTimestamp")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testIntegerBiggerThan128(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val t1 = t.filter('a === 300)
+    val t2 = t.filter("a = 300")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testNumericAutoCastInArithmetic() {
+    val util = batchTestUtil()
+    val table = util.addTable[(Byte, Short, Int, Long, Float, Double, Long, Double)](
+      "Table",
+      '_1, '_2, '_3, '_4, '_5, '_6, '_7, '_8)
+
+    val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+      '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+    val t2 = table.select("_1 + 1, _2 +" +
+      " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutoCastInComparison() {
+    val util = batchTestUtil()
+    val table = util.addTable[(Byte, Short, Int, Long, Float, Double)](
+      "Table",
+      'a, 'b, 'c, 'd, 'e, 'f)
+
+    val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
+      'd > 1.0f && 'e > 1.0d && 'f > 1)
+    val t2 = table
+      .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Double, Long, Boolean)](
+      "Table",
+      '_1, '_2, '_3, '_4)
+
+    val t1 = table .select(
+      // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+      // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+      // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+      // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE), // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+    val t2 = table.select(
+      // * -> String
+      "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
+        // NUMERIC TYPE -> Boolean
+        "_1.cast(BOOLEAN), _2.cast(BOOLEAN), _3.cast(BOOLEAN)," +
+        // NUMERIC TYPE -> NUMERIC TYPE
+        "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
+        // Boolean -> NUMERIC TYPE
+        "_4.cast(DOUBLE)," +
+        // identity casting
+        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOLEAN)")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val util = batchTestUtil()
+    val table = util.addTable[(String, String, String)](
+      "Table",
+      '_1, '_2, '_3)
+
+    val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+    val t2 = table.select(
+      "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
+        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOLEAN)")
+
+    verifyTableEquals(t1, t2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
new file mode 100644
index 0000000..6ef46af
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.utils._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class CorrelateStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testCorrelateJoins(): Unit = {
+    val util = batchTestUtil()
+
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
+    val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
+
+    // test cross join
+    val func1 = new TableFunc1
+    util.javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+    var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    util.javaTableEnv.registerFunction("func2", func2)
+    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    util.javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+  }
+}