You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:18 UTC

[09/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
deleted file mode 100644
index 3d1704e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UnionITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
-    val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("Hi", "Hallo")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionFieldsNameNotOverlap1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionFieldsNameNotOverlap2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
deleted file mode 100644
index 0e33d8b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.junit.Test
-
-class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testSort(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.join(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.union(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersect(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.intersect(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectAll(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.intersectAll(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinus(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.minus(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusAll(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.minusAll(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLimit(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.limit(0,5)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index 3edfd8c..bb9877b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -18,257 +18,14 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils._
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
-import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
-import org.mockito.Mockito._
 
 class UserDefinedTableFunctionTest extends TableTestBase {
 
   @Test
-  def testJavaScalaTableAPIEquality(): Unit = {
-    // mock
-    val ds = mock(classOf[DataStream[Row]])
-    val jDs = mock(classOf[JDataStream[Row]])
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    when(ds.javaStream).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
-    // test cross join
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "func1(c)").as("s")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.join(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.join(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.join(func1('c.substring(2)) as 's)
-      .select('a, 'c, 's)
-    javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // check scala object is forbidden
-    expectExceptionThrown(
-      tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(
-      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(in1.join(ObjectTableFunction('a, 1)), "Scala object")
-
-  }
-
-  @Test
-  def testInvalidTableFunctions(): Unit = {
-    // mock
-    val ds = mock(classOf[DataStream[Row]])
-    val jDs = mock(classOf[JDataStream[Row]])
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    when(ds.javaStream).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-
-    // table function call select
-    expectExceptionThrown(
-      func1('c).select("f0"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call select
-    expectExceptionThrown(
-      func1('c).select('f0),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call writeToSink
-    expectExceptionThrown(
-      func1('c).writeToSink(null),
-      "Cannot translate a query with an unbounded table function call."
-    )
-
-    // table function call distinct
-    expectExceptionThrown(
-      func1('c).distinct(),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call filter
-    expectExceptionThrown(
-      func1('c).filter('f0 === "?"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call filter
-    expectExceptionThrown(
-      func1('c).filter("f0 = '?'"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call limit
-    expectExceptionThrown(
-      func1('c).orderBy('f0).limit(3),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call limit
-    expectExceptionThrown(
-      func1('c).orderBy('f0).limit(0, 3),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call orderBy
-    expectExceptionThrown(
-      func1('c).orderBy("f0"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call orderBy
-    expectExceptionThrown(
-      func1('c).orderBy('f0),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call where
-    expectExceptionThrown(
-      func1('c).where("f0 = '?'"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-    // table function call where
-    expectExceptionThrown(
-      func1('c).where('f0 === "?"),
-      "TableFunction can only be used in join and leftOuterJoin."
-    )
-
-  }
-
-  @Test
-  def testInvalidTableFunction(): Unit = {
-    // mock
-    val util = streamTestUtil()
-    val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
-
-    //=================== check scala object is forbidden =====================
-    // Scala table environment register
-    expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
-    // Java table environment register
-    expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
-    // Scala Table API directly call
-    expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
-
-
-    //============ throw exception when table function is not registered =========
-    // Java Table API call
-    expectExceptionThrown(
-      t.join(new Table(util.tEnv, "nonexist(a)")
-      ), "Undefined function: NONEXIST")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
-      "No match found for function signature nonexist(<NUMERIC>)")
-
-
-    //========= throw exception when the called function is a scalar function ====
-    util.tEnv.registerFunction("func0", Func0)
-
-    // Java Table API call
-    expectExceptionThrown(
-      t.join(new Table(util.tEnv, "func0(a)")),
-      "only accept String that define table function",
-      classOf[TableException])
-    // SQL API call
-    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
-      null,
-      classOf[AssertionError])
-
-    //========== throw exception when the parameters is not correct ===============
-    // Java Table API call
-    util.addFunction("func2", new TableFunc2)
-    expectExceptionThrown(
-      t.join(new Table(util.tEnv, "func2(c, c)")),
-      "Given parameters of function 'FUNC2' do not match any signature")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
-      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
-  }
-
-  @Test
   def testCrossJoin(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -459,25 +216,4 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
-  // ----------------------------------------------------------------------------------------------
-
-  private def expectExceptionThrown(
-      function: => Unit,
-      keywords: String,
-      clazz: Class[_ <: Throwable] = classOf[ValidationException])
-    : Unit = {
-    try {
-      function
-      fail(s"Expected a $clazz, but no exception is thrown.")
-    } catch {
-      case e if e.getClass == clazz =>
-        if (keywords != null) {
-          assertTrue(
-            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
-            e.getMessage.contains(keywords))
-        }
-      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..b558193
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testSimpleSelect(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]()
+
+    val resScala = t.select('_1, '_2)
+    val resJava = t.select("_1, _2")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.select('*)
+    val resJava = t.select("*")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSelectWithWhere(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val resScala = t.where('string === "true").select('int)
+    val resJava = t.where("string === 'true'").select("int")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.select('int, 'string)
+    val resJava = t.select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleSelectWithAlias(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val resScala = t.select('int as 'myInt, 'string as 'myString)
+    val resJava = t.select("int as myInt, string as myString")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int === 3).select('int as 'myInt, 'string)
+    val resJava = t.filter("int === 3").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter(Literal(false)).select('int as 'myInt, 'string)
+    val resJava = t.filter("false").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter(Literal(true)).select('int as 'myInt, 'string)
+    val resJava = t.filter("true").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testNotEqualsFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int !== 2).filter('string.like("%world%")).select('int, 'string)
+    val resJava = t.filter("int !== 2").filter("string.like('%world%')").select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testFilterWithExpression(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int % 2 === 0).select('int, 'string)
+    val resJava = t.filter("int % 2 === 0").select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
new file mode 100644
index 0000000..fb23f09
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
+import org.junit.Test
+
+class GroupAggregationsStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .groupBy('string)
+      .select('int.count as 'cnt, weightAvgFun('long, 'int))
+
+    // String / Java API
+    val resJava = t
+      .groupBy("string")
+      .select("int.count as cnt, weightAvgFun(long, int)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testNonGroupedAggregate(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    // Expression / Scala API
+    val resScala = t.select('int.count as 'cnt, 'long.sum)
+
+    // String / Java API
+    val resJava = t.select("int.count as cnt, long.sum")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index 1cc156e..bb0b121 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -18,9 +18,9 @@
 package org.apache.flink.table.api.scala.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.{Slide => JSlide}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.api.java.{Slide => JSlide, Session => JSession, Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
@@ -29,14 +29,14 @@ import org.junit.Test
 class GroupWindowStringExpressionTest extends TableTestBase {
 
   @Test
-  def testJavaScalaTableAPIEquality(): Unit = {
+  def testRowTimeSlide(): Unit = {
     val util = streamTestUtil()
     val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
 
     val myCountFun = new CountAggFunction
-    util.tEnv.registerFunction("myCountFun", myCountFun)
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
     val weightAvgFun = new WeightedAvg
-    util.tEnv.registerFunction("weightAvgFun", weightAvgFun)
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     // Expression / Scala API
     val resScala = t
@@ -66,4 +66,193 @@ class GroupWindowStringExpressionTest extends TableTestBase {
 
     verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testRowTimeTumble(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, Long, String)]('int, 'long, 'rowtime.rowtime, 'string)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 4.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select(
+        'string,
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("4.hours").on("rowtime").as("w"))
+      .groupBy("w, string")
+      .select(
+        "string, " +
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowTimeSession(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Session withGap 4.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select(
+        'string,
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start)
+
+    // String / Java API
+    val resJava = t
+      .window(JSession.withGap("4.hours").on("rowtime").as("w"))
+      .groupBy("w, string")
+      .select(
+        "string, " +
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)"
+      )
+
+    verifyTableEquals(resJava, resScala)
+  }
+  @Test
+  def testProcTimeSlide(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Slide over 4.hours every 2.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testProcTimeTumble(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long,'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 4.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("4.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testProcTimeSession(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Session withGap 4.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JSession.withGap("4.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w), " +
+        "end(w)"
+      )
+
+    verifyTableEquals(resJava, resScala)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
index 4c95916..9430514 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.api.java.{Over => JOver}
 import org.apache.flink.table.api.scala.{Over => SOver, _}
 import org.apache.flink.table.expressions.utils.Func1
@@ -33,12 +34,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w as 'cnt, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w as cnt, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -48,12 +52,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -63,12 +70,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -78,12 +88,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.orderBy("rowtime").preceding("10.rows").following("current_row").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -93,12 +106,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -108,13 +124,16 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(
         JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -124,12 +143,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -139,12 +161,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
 
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
     val resScala = t
       .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w)
-      .select('a, 'b.sum over 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(JOver.orderBy("rowtime").preceding("4.hours").following("current_range").as("w"))
-      .select("a, SUM(b) OVER w")
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
   }
@@ -181,6 +206,4 @@ class OverWindowStringExpressionTest extends TableTestBase {
 
     verifyTableEquals(resScala, resJava)
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
new file mode 100644
index 0000000..dc2e92f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UnionStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testUnionAll(): Unit = {
+    val util = streamTestUtil()
+    val t1 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val t2 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t1.unionAll(t2).select('int)
+    val resJava = t1.unionAll(t2).select("int")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testUnionAllWithFilter(): Unit = {
+    val util = streamTestUtil()
+    val t1 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val t2 = util.addTable[(Int, Long, Double, String)]('int, 'long, 'double, 'string)
+
+    val resScala = t1.unionAll(t2.select('int, 'long, 'string)).filter('int < 2).select('int)
+    val resJava = t1.unionAll(t2.select("int, long, string")).filter("int < 2").select("int")
+    verifyTableEquals(resJava, resScala)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
new file mode 100644
index 0000000..2f0c57c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.utils._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testJoin(): Unit = {
+
+    val util = streamTestUtil()
+    val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    val jTab = util.addJavaTable[Row](typeInfo,"MyTab","a, b, c")
+
+    // test cross join
+    val func1 = new TableFunc1
+    util.javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+    var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "func1(c)").as("s")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    util.javaTableEnv.registerFunction("func2", func2)
+    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = sTab.join(hierarchy('c) as ('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    util.javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..879fca3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testOnlyFieldRefInAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
new file mode 100644
index 0000000..5cec6fc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupAggregationsValidationTest extends TableTestBase {
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testOverAggregation(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+    val overAgg = new OverAgg0
+    table.select(overAgg('a, 'b))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val ds = table
+      // must fail. '_foo is not a valid field
+      .groupBy('_foo)
+      .select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val ds = table
+      .groupBy('a, 'b)
+      // must fail. 'c is not a grouping key or aggregation
+      .select('c)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
new file mode 100644
index 0000000..e046986
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowAggregationsValidationTest extends TableTestBase {
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testOverAggregation(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val overAgg = new OverAgg0
+    table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select(overAgg('long, 'int))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowProperty(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .select('string, 'string.start) // property in non windowed table
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupByWithoutWindowAlias(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowTimeRef(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
+      .groupBy('w2)
+      .select('string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      .window(Tumble over "WRONG" on 'long as 'w) // string is not a valid interval
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTumbleUdAggWithInvalidArgs(): Unit = {
+    val util = streamTestUtil()
+    val weightedAvg = new WeightedAvgWithMerge
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 2.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      .window(Slide over "WRONG" every "WRONG" on 'long as 'w) // string is not a valid interval
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSlide(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      // row and time intervals may not be mixed
+      .window(Slide over 12.rows every 1.minute on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSlideUdAggWithInvalidArgs(): Unit = {
+    val util = streamTestUtil()
+    val weightedAvg = new WeightedAvgWithMerge
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Slide over 2.hours every 30.minutes on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSessionGap(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      // row interval is not valid for session windows
+      .window(Session withGap 10.rows on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias1(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      .window(Session withGap 100.milli on 'long as 1 + 1) // expression instead of a symbol
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    table
+      // field name "string" is already present
+      .window(Session withGap 100.milli on 'long as 'string)
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSessionUdAggWithInvalidArgs(): Unit = {
+    val util = streamTestUtil()
+    val weightedAvg = new WeightedAvgWithMerge
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    table
+      .window(Session withGap 2.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowPropertyOnRowCountsTumblingWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    table
+    .window(Tumble over 2.rows on 'proctime as 'w)
+    .groupBy('w, 'string)
+    .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowPropertyOnRowCountsSlidingWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    table
+    .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
+    .groupBy('w, 'string)
+    .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..cb84ae4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
+    'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'x)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testOrderBy(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingAndFollowingUsingIsLiteral(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingAndFollowingUsingSameType(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPartitionByWithUnresolved(): Unit = {
+    val result = table
+      .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPartitionByWithNotKeyType(): Unit = {
+    val table2 = streamUtil.addTable[(Int, String, Either[Long, String])]("MyTable2", 'a, 'b, 'c)
+
+    val result = table2
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingValue(): Unit = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding -1.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFollowingValue(): Unit = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUdAggWithInvalidArgs(): Unit = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+      .select('c, weightedAvg('b, 'a) over 'w)
+    streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+  }
+
+  @Test
+  def testAccessesWindowProperties(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+    table
+    .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+    .select('c, 'a.count over 'w, 'w.start, 'w.end)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
new file mode 100644
index 0000000..72fb0b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.table.{TestAppendSink, TestUpsertSink}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TableSinksValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnUpdatingTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
+
+    t.groupBy('text)
+    .select('text, 'id.count, 'num.sum)
+    .writeToSink(new TestAppendSink)
+
+    // must fail because table is not append-only
+    env.execute()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+    .groupBy('len, 'cTrue)
+    .select('len, 'id.count, 'num.sum)
+    .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false))
+
+    // must fail because table is updating table without full key
+    env.execute()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
new file mode 100644
index 0000000..bcfa146
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.{TestProctimeSource, TestRowtimeSource}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TableSourceValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+    val t = util.tableEnv.scan("rowTimeT")
+            .select('id)
+
+    util.tableEnv.optimize(t.getRelNode, false)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testProctimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+    val t = util.tableEnv.scan("procTimeT")
+            .select('id)
+
+    util.tableEnv.optimize(t.getRelNode, false)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
new file mode 100644
index 0000000..7a684d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.{Tumble, _}
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.runtime.datastream.table.TimeAttributesITCase._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TimeAttributesValidationTest extends TableTestBase {
+
+  val data = List(
+    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+  @Test(expected = classOf[TableException])
+  def testInvalidTimeCharacteristic(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+                 .fromCollection(data)
+                 .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidUseOfRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    .select('rowtime.rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidUseOfRowtime2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream
+    .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    .window(Tumble over 2.millis on 'rowtime as 'w)
+    .groupBy('w)
+    .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
new file mode 100644
index 0000000..1767d9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnionValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionFieldsNameNotOverlap1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionFieldsNameNotOverlap2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnv(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2)
+  }
+}