You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/12/07 15:57:23 UTC

[4/5] flink git commit: [FLINK-4469] [table] Minor improvements

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..a9f3f7b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.batch.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.apache.flink.api.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.api.table.{Row, TableEnvironment, Types}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testJavaScalaTableAPIEquality(): Unit = {
+    // mock
+    val ds = mock(classOf[DataSet[Row]])
+    val jDs = mock(classOf[JDataSet[Row]])
+    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
+    when(ds.javaSet).thenReturn(jDs)
+    when(jDs.getType).thenReturn(typeInfo)
+
+    // Scala environment
+    val env = mock(classOf[ScalaExecutionEnv])
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+    // Java environment
+    val javaEnv = mock(classOf[JavaExecutionEnv])
+    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+    val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
+    javaTableEnv.registerTable("MyTable", in2)
+
+    // test cross apply
+    val func1 = new TableFunc1
+    javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
+    var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test outer apply
+    scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
+    javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    javaTableEnv.registerFunction("func2", func2)
+    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+      .select('c, 'name, 'len, 'adult)
+    javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = in1.crossApply(pojo('c))
+      .select('c, 'name, 'age)
+    javaTable = in2.crossApply("pojo(c)")
+      .select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+      .select('c, 'name, 'len).filter('len > 2)
+    javaTable = in2.crossApply("func2(c) as (name, len)")
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+      .select('a, 'c, 's)
+    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+      .select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+  }
+
+  @Test
+  def testCrossApply(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.crossApply(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.outerApply(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionITCase.scala
deleted file mode 100644
index f19f7f9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionITCase.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.stream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils.{TableFunc0, TableFunc1}
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UserDefinedTableFunctionITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSQLCrossApply(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    tEnv.registerFunction("split", new TableFunc0)
-
-    val sqlQuery = "SELECT MyTable.c, t.n, t.a FROM MyTable, LATERAL TABLE(split(c)) AS t(n,a)"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSQLOuterApply(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    tEnv.registerFunction("split", new TableFunc0)
-
-    val sqlQuery = "SELECT MyTable.c, t.n, t.a FROM MyTable " +
-      "LEFT JOIN LATERAL TABLE(split(c)) AS t(n,a) ON TRUE"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "nosharp,null,null", "Jack#22,Jack,22",
-      "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testTableAPICrossApply(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .crossApply(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testTableAPIOuterApply(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .outerApply(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "nosharp,null,null", "Jack#22,Jack,22",
-      "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testTableAPIWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .crossApply(func0('c) as('name, 'age))
-      .select('c, 'name, 'age)
-      .filter('age > 20)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testTableAPIWithScalarFunction(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    val func1 = new TableFunc1
-
-    val result = t
-      .crossApply(func1('c.substring(2)) as 's)
-      .select('c, 's)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Jack#22,ack", "Jack#22,22", "John#19,ohn",
-                                       "John#19,19", "Anna#44,nna", "Anna#44,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  private def getSmall3TupleDataStream(
-    env: StreamExecutionEnvironment)
-  : DataStream[(Int, Long, String)] = {
-
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Jack#22"))
-    data.+=((2, 2L, "John#19"))
-    data.+=((3, 2L, "Anna#44"))
-    data.+=((4, 3L, "nosharp"))
-    env.fromCollection(data)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index bc01819..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.stream
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table._
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
-import org.junit.Assert.{assertTrue, fail}
-import org.junit.Test
-import org.mockito.Mockito._
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testTableAPI(): Unit = {
-    // mock
-    val ds = mock(classOf[DataStream[Row]])
-    val jDs = mock(classOf[JDataStream[Row]])
-    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
-    when(ds.javaStream).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
-    // test cross apply
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.crossApply(func1('c) as ('s)).select('c, 's)
-    var javaTable = in2.crossApply("func1(c) as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test outer apply
-    scalaTable = in1.outerApply(func1('c) as ('s)).select('c, 's)
-    javaTable = in2.outerApply("func1(c) as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.crossApply(func1('c, "$") as ('s)).select('c, 's)
-    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.crossApply("func2(c) as (name, len)").select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.crossApply("hierarchy(c) as (name, adult, len)")
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.crossApply(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.crossApply("pojo(c)")
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.crossApply("func2(c) as (name, len)")
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.crossApply(func1('c.substring(2)) as ('s))
-      .select('a, 'c, 's)
-    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // check scala object is forbidden
-    expectExceptionThrown(
-      tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(
-      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(
-      in1.crossApply(ObjectTableFunction('a, 1)),"Scala object")
-
-  }
-
-
-  @Test
-  def testInvalidTableFunction(): Unit = {
-    // mock
-    val util = streamTestUtil()
-    val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
-
-    //=================== check scala object is forbidden =====================
-    // Scala table environment register
-    expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
-    // Java table environment register
-    expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
-    // Scala Table API directly call
-    expectExceptionThrown(t.crossApply(ObjectTableFunction('a, 1)), "Scala object")
-
-
-    //============ throw exception when table function is not registered =========
-    // Java Table API call
-    expectExceptionThrown(t.crossApply("nonexist(a)"), "Undefined function: NONEXIST")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
-      "No match found for function signature nonexist(<NUMERIC>)")
-
-
-    //========= throw exception when the called function is a scalar function ====
-    util.addFunction("func0", Func0)
-    // Java Table API call
-    expectExceptionThrown(
-      t.crossApply("func0(a)"),
-      "only accept TableFunction",
-      classOf[TableException])
-    // SQL API call
-    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
-      null,
-      classOf[AssertionError])
-
-    //========== throw exception when the parameters is not correct ===============
-    // Java Table API call
-    util.addFunction("func2", new TableFunc2)
-    expectExceptionThrown(
-      t.crossApply("func2(c, c)"),
-      "Given parameters of function 'FUNC2' do not match any signature")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
-      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
-  }
-
-  private def expectExceptionThrown(
-      function: => Unit,
-      keywords: String,
-      clazz: Class[_ <: Throwable] = classOf[ValidationException])
-    : Unit = {
-    try {
-      function
-      fail(s"Expected a $clazz, but no exception is thrown.")
-    } catch {
-      case e if e.getClass == clazz =>
-        if (keywords != null) {
-          assertTrue(
-            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
-            e.getMessage.contains(keywords))
-        }
-      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
-    }
-  }
-
-  @Test
-  def testSQLWithCrossApply(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-
-    // test overloading
-
-    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
-    val expected2 = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery2, expected2)
-  }
-
-  @Test
-  def testSQLWithOuterApply(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithCustomType(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithHierarchyType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
-    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithPojoType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
-    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "age")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithFilter(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
-      "WHERE len > 2"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-
-  @Test
-  def testSQLWithScalarFunction(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..c2ded28
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.api.table.utils._
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testCrossApply(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test overloading
+
+    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+    val expected2 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c, '$')"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery2, expected2)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = streamTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new HierarchyTableFunction
+    util.addFunction("hierarchy", function)
+
+    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "hierarchy($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new PojoTableFunc
+    util.addFunction("pojo", function)
+
+    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "pojo($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " INTEGER age, VARCHAR(2147483647) name)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "age")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = streamTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+      "WHERE len > 2"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..bc28d67
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.utils._
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.apache.flink.api.table.utils._
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
+import org.junit.Assert.{assertTrue, fail}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testJavaScalaTableAPIEquality(): Unit = {
+    // mock
+    val ds = mock(classOf[DataStream[Row]])
+    val jDs = mock(classOf[JDataStream[Row]])
+    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
+    when(ds.javaStream).thenReturn(jDs)
+    when(jDs.getType).thenReturn(typeInfo)
+
+    // Scala environment
+    val env = mock(classOf[ScalaExecutionEnv])
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+    // Java environment
+    val javaEnv = mock(classOf[JavaExecutionEnv])
+    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+
+    // test cross apply
+    val func1 = new TableFunc1
+    javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
+    var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test outer apply
+    scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
+    javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    javaTableEnv.registerFunction("func2", func2)
+    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+      .select('c, 'name, 'len, 'adult)
+    javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = in1.crossApply(pojo('c))
+      .select('c, 'name, 'age)
+    javaTable = in2.crossApply("pojo(c)")
+      .select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+      .select('c, 'name, 'len).filter('len > 2)
+    javaTable = in2.crossApply("func2(c) as (name, len)")
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+      .select('a, 'c, 's)
+    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+      .select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // check scala object is forbidden
+    expectExceptionThrown(
+      tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+    expectExceptionThrown(
+      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+    expectExceptionThrown(
+      in1.crossApply(ObjectTableFunction('a, 1)),"Scala object")
+
+  }
+
+  @Test
+  def testInvalidTableFunction(): Unit = {
+    // mock
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
+
+    //=================== check scala object is forbidden =====================
+    // Scala table environment register
+    expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
+    // Java table environment register
+    expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
+    // Scala Table API directly call
+    expectExceptionThrown(t.crossApply(ObjectTableFunction('a, 1)), "Scala object")
+
+
+    //============ throw exception when table function is not registered =========
+    // Java Table API call
+    expectExceptionThrown(t.crossApply("nonexist(a)"), "Undefined function: NONEXIST")
+    // SQL API call
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+      "No match found for function signature nonexist(<NUMERIC>)")
+
+
+    //========= throw exception when the called function is a scalar function ====
+    util.addFunction("func0", Func0)
+    // Java Table API call
+    expectExceptionThrown(
+      t.crossApply("func0(a)"),
+      "only accept expressions that define table functions",
+      classOf[TableException])
+    // SQL API call
+    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+      null,
+      classOf[AssertionError])
+
+    //========== throw exception when the parameters is not correct ===============
+    // Java Table API call
+    util.addFunction("func2", new TableFunc2)
+    expectExceptionThrown(
+      t.crossApply("func2(c, c)"),
+      "Given parameters of function 'FUNC2' do not match any signature")
+    // SQL API call
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
+  }
+
+  @Test
+  def testCrossApply(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.crossApply(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.outerApply(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table.crossApply(function('c) as ('name, 'len)).select('c, 'name, 'len)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+           "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("hierarchy", new HierarchyTableFunction)
+
+    val result = table.crossApply(function('c) as ('name, 'adult, 'len))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"$function($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+        " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("pojo", new PojoTableFunc)
+
+    val result = table.crossApply(function('c))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"$function($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+         "INTEGER age, VARCHAR(2147483647) name)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table
+      .crossApply(function('c) as ('name, 'len))
+      .select('c, 'name, 'len)
+      .filter('len > 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+          "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.crossApply(function('c.substring(2)) as 's)
+
+    val expected = unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation",  s"$function(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def expectExceptionThrown(
+      function: => Unit,
+      keywords: String,
+      clazz: Class[_ <: Throwable] = classOf[ValidationException])
+    : Unit = {
+    try {
+      function
+      fail(s"Expected a $clazz, but no exception is thrown.")
+    } catch {
+      case e if e.getClass == clazz =>
+        if (keywords != null) {
+          assertTrue(
+            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+            e.getMessage.contains(keywords))
+        }
+      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedTableFunctions.scala
deleted file mode 100644
index 1e6bdb8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedTableFunctions.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.utils
-
-import java.lang.Boolean
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-
-
-case class SimpleUser(name: String, age: Int)
-
-class TableFunc0 extends TableFunction[SimpleUser] {
-  // make sure input element's format is "<string>#<int>"
-  def eval(user: String): Unit = {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      collect(SimpleUser(splits(0), splits(1).toInt))
-    }
-  }
-}
-
-class TableFunc1 extends TableFunction[String] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")){
-      str.split("#").foreach(collect)
-    }
-  }
-
-  def eval(str: String, prefix: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach(s => collect(prefix + s))
-    }
-  }
-}
-
-
-class TableFunc2 extends TableFunction[Row] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach({ s =>
-        val row = new Row(2)
-        row.setField(0, s)
-        row.setField(1, s.length)
-        collect(row)
-      })
-    }
-  }
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
-                        BasicTypeInfo.INT_TYPE_INFO))
-  }
-}
-
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
-  def eval(user: String) {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      val age = splits(1).toInt
-      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
-    }
-  }
-}
-
-abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]] {}
-
-class PojoTableFunc extends TableFunction[PojoUser] {
-  def eval(user: String) {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      collect(new PojoUser(splits(0), splits(1).toInt))
-    }
-  }
-}
-
-class PojoUser() {
-  var name: String = _
-  var age: Int = 0
-
-  def this(name: String, age: Int) {
-    this()
-    this.name = name
-    this.age = age
-  }
-}
-
-// ----------------------------------------------------------------------------------------------
-// Invalid Table Functions
-// ----------------------------------------------------------------------------------------------
-
-
-// this is used to check whether scala object is forbidden
-object ObjectTableFunction extends TableFunction[Integer] {
-  def eval(a: Int, b: Int): Unit = {
-    collect(a)
-    collect(b)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
new file mode 100644
index 0000000..cc551f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.dataset
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.expressions.utils._
+import org.apache.flink.api.table.utils._
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class DataSetCorrelateITCase(
+  mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testCrossApply(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+
+    val func1 = new TableFunc1
+    val result = in.crossApply(func1('c) as 's).select('c, 's).toDataSet[Row]
+    val results = result.collect()
+    val expected = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
+      "Anna#44,Anna\n" + "Anna#44,44\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+    // with overloading
+    val result2 = in.crossApply(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
+    val results2 = result2.collect()
+    val expected2 = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
+      "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
+    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+
+    val func2 = new TableFunc2
+    val result = in.outerApply(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
+    val results = result.collect()
+    val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
+      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testWithFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = in
+      .crossApply(func0('c) as ('name, 'age))
+      .select('c, 'name, 'age)
+      .filter('age > 20)
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCustomReturnType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val func2 = new TableFunc2
+
+    val result = in
+      .crossApply(func2('c) as ('name, 'len))
+      .select('c, 'name, 'len)
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
+      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+
+    val hierarchy = new HierarchyTableFunction
+    val result = in
+      .crossApply(hierarchy('c) as ('name, 'adult, 'len))
+      .select('c, 'name, 'adult, 'len)
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = "Jack#22,Jack,true,22\n" + "John#19,John,false,19\n" +
+      "Anna#44,Anna,true,44\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+
+    val pojo = new PojoTableFunc()
+    val result = in
+      .crossApply(pojo('c))
+      .select('c, 'name, 'age)
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUDTFWithScalarFunction(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val func1 = new TableFunc1
+
+    val result = in
+      .crossApply(func1('c.substring(2)) as 's)
+      .select('c, 's)
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = "Jack#22,ack\n" + "Jack#22,22\n" + "John#19,ohn\n" + "John#19,19\n" +
+      "Anna#44,nna\n" + "Anna#44,44\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  private def testData(
+      env: ExecutionEnvironment)
+    : DataSet[(Int, Long, String)] = {
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Jack#22"))
+    data.+=((2, 2L, "John#19"))
+    data.+=((3, 2L, "Anna#44"))
+    data.+=((4, 3L, "nosharp"))
+    env.fromCollection(data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
new file mode 100644
index 0000000..c2c523a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.datastream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableFunc0
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testCrossApply(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .crossApply(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .outerApply(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "nosharp,null,null", "Jack#22,Jack,22",
+      "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  private def testData(
+    env: StreamExecutionEnvironment)
+  : DataStream[(Int, Long, String)] = {
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Jack#22"))
+    data.+=((2, 2L, "John#19"))
+    data.+=((3, 2L, "Anna#44"))
+    data.+=((4, 3L, "nosharp"))
+    env.fromCollection(data)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index 73f50f5..4eaba90 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -45,9 +45,10 @@ class TableTestBase {
   }
 
   def verifyTableEquals(expected: Table, actual: Table): Unit = {
-    assertEquals("Logical Plan do not match",
-                 RelOptUtil.toString(expected.getRelNode),
-                 RelOptUtil.toString(actual.getRelNode))
+    assertEquals(
+      "Logical plans do not match",
+      RelOptUtil.toString(expected.getRelNode),
+      RelOptUtil.toString(actual.getRelNode))
   }
 
 }
@@ -61,7 +62,7 @@ abstract class TableTestUtil {
   }
 
   def addTable[T: TypeInformation](name: String, fields: Expression*): Table
-  def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): Unit
+  def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): TableFunction[T]
   def addFunction(name: String, function: ScalarFunction): Unit
 
   def verifySql(query: String, expected: String): Unit
@@ -132,8 +133,9 @@ case class BatchTableTestUtil() extends TableTestUtil {
   def addFunction[T: TypeInformation](
       name: String,
       function: TableFunction[T])
-    : Unit = {
+    : TableFunction[T] = {
     tEnv.registerFunction(name, function)
+    function
   }
 
   def addFunction(name: String, function: ScalarFunction): Unit = {
@@ -188,8 +190,9 @@ case class StreamTableTestUtil() extends TableTestUtil {
   def addFunction[T: TypeInformation](
       name: String,
       function: TableFunction[T])
-    : Unit = {
+    : TableFunction[T] = {
     tEnv.registerFunction(name, function)
+    function
   }
 
   def addFunction(name: String, function: ScalarFunction): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
new file mode 100644
index 0000000..3da3857
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.utils
+
+import java.lang.Boolean
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple3
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+
+
+case class SimpleUser(name: String, age: Int)
+
+class TableFunc0 extends TableFunction[SimpleUser] {
+  // make sure input element's format is "<string>#<int>"
+  def eval(user: String): Unit = {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      collect(SimpleUser(splits(0), splits(1).toInt))
+    }
+  }
+}
+
+class TableFunc1 extends TableFunction[String] {
+  def eval(str: String): Unit = {
+    if (str.contains("#")){
+      str.split("#").foreach(collect)
+    }
+  }
+
+  def eval(str: String, prefix: String): Unit = {
+    if (str.contains("#")) {
+      str.split("#").foreach(s => collect(prefix + s))
+    }
+  }
+}
+
+
+class TableFunc2 extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    if (str.contains("#")) {
+      str.split("#").foreach({ s =>
+        val row = new Row(2)
+        row.setField(0, s)
+        row.setField(1, s.length)
+        collect(row)
+      })
+    }
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
+                        BasicTypeInfo.INT_TYPE_INFO))
+  }
+}
+
+class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
+  def eval(user: String) {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      val age = splits(1).toInt
+      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
+    }
+  }
+}
+
+abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]] {}
+
+class PojoTableFunc extends TableFunction[PojoUser] {
+  def eval(user: String) {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      collect(new PojoUser(splits(0), splits(1).toInt))
+    }
+  }
+}
+
+class PojoUser() {
+  var name: String = _
+  var age: Int = 0
+
+  def this(name: String, age: Int) {
+    this()
+    this.name = name
+    this.age = age
+  }
+}
+
+// ----------------------------------------------------------------------------------------------
+// Invalid Table Functions
+// ----------------------------------------------------------------------------------------------
+
+
+// this is used to check whether scala object is forbidden
+object ObjectTableFunction extends TableFunction[Integer] {
+  def eval(a: Int, b: Int): Unit = {
+    collect(a)
+    collect(b)
+  }
+}