You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/12/07 15:57:24 UTC

[5/5] flink git commit: [FLINK-4469] [table] Minor improvements

[FLINK-4469] [table] Minor improvements

- Fixed typos
- Removed implicit conversion with TableCallBuilder
- Fixed bugs about expression parser alias and static eval methods
- Refactored tests

This closes #2653.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/684defbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/684defbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/684defbf

Branch: refs/heads/master
Commit: 684defbf33168e34657bc1a25607adb53be248c5
Parents: e139f59
Author: twalthr <tw...@apache.org>
Authored: Tue Dec 6 17:46:54 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Dec 7 16:55:37 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  22 +
 .../api/java/table/BatchTableEnvironment.scala  |   3 +-
 .../api/java/table/StreamTableEnvironment.scala |   3 +-
 .../api/scala/table/BatchTableEnvironment.scala |   6 +-
 .../scala/table/TableFunctionCallBuilder.scala  |  39 --
 .../flink/api/scala/table/expressionDsl.scala   |  10 +-
 .../flink/api/table/TableEnvironment.scala      |  10 +-
 .../flink/api/table/codegen/CodeGenerator.scala |  13 +-
 .../codegen/calls/TableFunctionCallGen.scala    |   3 +-
 .../table/expressions/ExpressionParser.scala    |  12 +-
 .../flink/api/table/expressions/call.scala      |   9 +-
 .../api/table/functions/ScalarFunction.scala    |   7 +-
 .../api/table/functions/TableFunction.scala     |  31 +-
 .../functions/utils/ScalarSqlFunction.scala     |   2 +-
 .../functions/utils/TableSqlFunction.scala      |  15 +-
 .../utils/UserDefinedFunctionUtils.scala        |  18 +-
 .../api/table/plan/ProjectionTranslator.scala   |   4 +-
 .../api/table/plan/logical/operators.scala      |  23 +-
 .../api/table/plan/nodes/FlinkCorrelate.scala   |   7 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |   2 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |   3 +-
 .../rules/dataSet/DataSetCorrelateRule.scala    |   7 +-
 .../datastream/DataStreamCorrelateRule.scala    |  12 +-
 .../org/apache/flink/api/table/table.scala      |  56 ++-
 .../api/table/validate/FunctionCatalog.scala    |  10 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../batch/UserDefinedTableFunctionITCase.scala  | 212 ----------
 .../batch/UserDefinedTableFunctionTest.scala    | 320 ---------------
 .../sql/UserDefinedTableFunctionTest.scala      | 238 +++++++++++
 .../table/UserDefinedTableFunctionTest.scala    | 179 +++++++++
 .../stream/UserDefinedTableFunctionITCase.scala | 181 ---------
 .../stream/UserDefinedTableFunctionTest.scala   | 402 -------------------
 .../sql/UserDefinedTableFunctionTest.scala      | 237 +++++++++++
 .../table/UserDefinedTableFunctionTest.scala    | 385 ++++++++++++++++++
 .../utils/UserDefinedTableFunctions.scala       | 116 ------
 .../dataset/DataSetCorrelateITCase.scala        | 177 ++++++++
 .../datastream/DataStreamCorrelateITCase.scala  |  90 +++++
 .../flink/api/table/utils/TableTestBase.scala   |  15 +-
 .../table/utils/UserDefinedTableFunctions.scala | 117 ++++++
 39 files changed, 1611 insertions(+), 1387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 848f9e4..6cf0dee 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1494,6 +1494,17 @@ Both the Table API and SQL come with a set of built-in functions for data transf
     <tr>
       <td>
         {% highlight java %}
+ANY.as(name [, name ]* )
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
 ANY.isNull
 {% endhighlight %}
       </td>
@@ -2045,6 +2056,17 @@ COMPOSITE.get(INT)
     <tr>
       <td>
         {% highlight scala %}
+ANY.as(name [, name ]* )
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 ANY.isNull
 {% endhighlight %}
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index b353377..3517338 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -168,7 +168,8 @@ class BatchTableEnvironment(
     * Registered functions can be referenced in Table API and SQL queries.
     *
     * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
     */
   def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
     implicit val typeInfo: TypeInformation[T] = TypeExtractor

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 367cb82..83293e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -170,7 +170,8 @@ class StreamTableEnvironment(
     * Registered functions can be referenced in Table API and SQL queries.
     *
     * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
     */
   def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
     implicit val typeInfo: TypeInformation[T] = TypeExtractor

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index 36885d2..f4bfe31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -142,13 +142,13 @@ class BatchTableEnvironment(
 
   /**
     * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
-    * Registered functions can be referenced in SQL queries.
+    * Registered functions can be referenced in Table API and SQL queries.
     *
     * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
     */
   def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
     registerTableFunctionInternal(name, tf)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
deleted file mode 100644
index 2261b70..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
-import org.apache.flink.api.table.functions.TableFunction
-
-case class TableFunctionCallBuilder[T: TypeInformation](udtf: TableFunction[T]) {
-  /**
-    * Creates a call to a [[TableFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[TableFunctionCall]]
-    */
-  def apply(params: Expression*): Expression = {
-    val resultType = if (udtf.getResultType == null) {
-      implicitly[TypeInformation[T]]
-    } else {
-      udtf.getResultType
-    }
-    TableFunctionCall(udtf.getClass.getSimpleName, udtf, params, resultType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index cc4c68d..175ce2e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.functions.TableFunction
 
 import scala.language.implicitConversions
 
@@ -98,6 +97,13 @@ trait ImplicitExpressionOperations {
 
   def cast(toType: TypeInformation[_]) = Cast(expr, toType)
 
+  /**
+    * Specifies a name for an expression i.e. a field.
+    *
+    * @param name name for one field
+    * @param extraNames additional names if the expression expands to multiple fields
+    * @return field with an alias
+    */
   def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
 
   def asc = Asc(expr)
@@ -540,8 +546,6 @@ trait ImplicitExpressionConversions {
   implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
   implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
   implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp)
-  implicit def UDTF2TableFunctionCall[T: TypeInformation](udtf: TableFunction[T]):
-    TableFunctionCallBuilder[T] = TableFunctionCallBuilder(udtf)
 }
 
 // ------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 8cabadb..b6d0e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -24,8 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexExecutorImpl
-import org.apache.calcite.schema.{SchemaPlus, Schemas}
+import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -158,7 +157,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * user-defined functions under this name.
     */
   def registerFunction(name: String, function: ScalarFunction): Unit = {
-    // check could be instantiated
+    // check if class could be instantiated
     checkForInstantiation(function.getClass)
 
     // register in Table API
@@ -174,9 +173,9 @@ abstract class TableEnvironment(val config: TableConfig) {
     */
   private[flink] def registerTableFunctionInternal[T: TypeInformation](
     name: String, function: TableFunction[T]): Unit = {
-    // check not Scala object
+    // check if class not Scala object
     checkNotSingleton(function.getClass)
-    // check could be instantiated
+    // check if class could be instantiated
     checkForInstantiation(function.getClass)
 
     val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
@@ -187,6 +186,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     // register in Table API
     functionCatalog.registerFunction(name, function.getClass)
+
     // register in SQL API
     val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
     functionCatalog.registerSqlFunctions(sqlFunctions)

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 9e4f569..f7d6863 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.FunctionGenerator
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
@@ -358,10 +359,11 @@ class CodeGenerator(
 
     val input2AccessExprs = input2 match {
       case Some(ti) => for (i <- 0 until ti.getArity)
-      // use generateFieldAccess instead of generateInputAccess to avoid the generated table
-      // function's field access code is put on the top of function body rather than the while loop
+        // use generateFieldAccess instead of generateInputAccess to avoid the generated table
+        // function's field access code is put on the top of function body rather than
+        // the while loop
         yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping)
-      case None => throw new CodeGenException("type information of input2 must not be null")
+      case None => throw new CodeGenException("Type information of input2 must not be null.")
     }
     (input1AccessExprs, input2AccessExprs)
   }
@@ -781,7 +783,7 @@ class CodeGenerator(
   }
 
   override def visitCorrelVariable(correlVariable: RexCorrelVariable): GeneratedExpression = {
-    GeneratedExpression(input1Term, GeneratedExpression.NEVER_NULL, "", input1)
+    GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1)
   }
 
   override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
@@ -1019,8 +1021,7 @@ class CodeGenerator(
       case None =>
         val expr = if (nullableInput) {
           generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
-        }
-        else {
+        } else {
           generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
         }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
index 27cb43f..37e70e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen.calls
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.GeneratedExpression.NEVER_NULL
 import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
 import org.apache.flink.api.table.functions.TableFunction
 import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
@@ -75,7 +76,7 @@ class TableFunctionCallGen(
     // has no result
     GeneratedExpression(
       functionReference,
-      GeneratedExpression.NEVER_NULL,
+      NEVER_NULL,
       functionCallCode,
       returnType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 6cd63ff..a926717 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -204,8 +204,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val suffixAs: PackratParser[Expression] =
-    composite ~ "." ~ AS ~ "(" ~ fieldReference ~ ")" ^^ {
-    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.name)
+    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
   }
 
   lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
@@ -325,8 +325,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val prefixAs: PackratParser[Expression] =
-    AS ~ "(" ~ expression ~ "," ~ fieldReference ~ ")" ^^ {
-    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.name)
+    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
   }
 
   lazy val prefixIf: PackratParser[Expression] =
@@ -447,8 +447,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
       case e ~ _ ~ name => Alias(e, name.name)
-    } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.drop(1).map(_.name))
+  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
   } | logic
 
   lazy val expression: PackratParser[Expression] = alias |

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 3e8d8b1..3bb9dac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -89,10 +89,8 @@ case class ScalarFunctionCall(
       ValidationSuccess
     }
   }
-
 }
 
-
 /**
   *
   * Expression for calling a user-defined table function with actual parameters.
@@ -114,10 +112,10 @@ case class TableFunctionCall(
   override private[flink] def children: Seq[Expression] = parameters
 
   /**
-    * Assigns an alias for this table function returned fields that the following `select()` clause
+    * Assigns an alias for this table function's returned fields that the following operator
     * can refer to.
     *
-    * @param aliasList alias for this table function returned fields
+    * @param aliasList alias for this table function's returned fields
     * @return this table function call
     */
   private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
@@ -155,4 +153,7 @@ case class TableFunctionCall(
       fieldNames,
       child)
   }
+
+  override def toString =
+    s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
index 86d9d66..2e16096 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
@@ -18,15 +18,11 @@
 
 package org.apache.flink.api.table.functions
 
-import java.lang.reflect.{Method, Modifier}
-
-import org.apache.calcite.sql.SqlFunction
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.ValidationException
 import org.apache.flink.api.table.expressions.{Expression, ScalarFunctionCall}
-import org.apache.flink.api.table.functions.utils.ScalarSqlFunction
-import org.apache.flink.api.table.{FlinkTypeFactory, ValidationException}
 
 /**
   * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
@@ -60,6 +56,7 @@ abstract class ScalarFunction extends UserDefinedFunction {
     ScalarFunctionCall(this, params)
   }
 
+  override def toString: String = getClass.getCanonicalName
 
   // ----------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
index 98a2921..3a56efb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
@@ -20,18 +20,16 @@ package org.apache.flink.api.table.functions
 
 import java.util
 
-import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.ValidationException
+import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
 
 /**
   * Base class for a user-defined table function (UDTF). A user-defined table functions works on
   * zero, one, or multiple scalar values as input and returns multiple rows as output.
   *
   * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
-  * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
-  * can also be overloaded by implementing multiple methods named "eval".
+  * method. An evaluation method must be declared publicly, not static and named "eval".
+  * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
   *
   * User-defined functions must have a default constructor and must be instantiable during runtime.
   *
@@ -51,14 +49,14 @@ import org.apache.flink.api.table.ValidationException
   *
   *   public class Split extends TableFunction<String> {
   *
-  *     // implement an "eval" method with several parameters you want
+  *     // implement an "eval" method with as many parameters as you want
   *     public void eval(String str) {
   *       for (String s : str.split(" ")) {
   *         collect(s);   // use collect(...) to emit an output row
   *       }
   *     }
   *
-  *     // can overloading eval method here ...
+  *     // you can overload the eval method here ...
   *   }
   *
   *   val tEnv: TableEnvironment = ...
@@ -82,6 +80,25 @@ import org.apache.flink.api.table.ValidationException
   */
 abstract class TableFunction[T] extends UserDefinedFunction {
 
+  /**
+    * Creates a call to a [[TableFunction]] in Scala Table API.
+    *
+    * @param params actual parameters of function
+    * @return [[Expression]] in form of a [[TableFunctionCall]]
+    */
+  final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
+    val resultType = if (getResultType == null) {
+      typeInfo
+    } else {
+      getResultType
+    }
+    TableFunctionCall(getClass.getSimpleName, this, params, resultType)
+  }
+
+  override def toString: String = getClass.getCanonicalName
+
+  // ----------------------------------------------------------------------------------------------
+
   private val rows: util.ArrayList[T] = new util.ArrayList[T]()
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
index 0a987aa..7953b25 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
@@ -125,6 +125,7 @@ object ScalarSqlFunction {
     : SqlOperandTypeChecker = {
 
     val signatures = getSignatures(scalarFunction)
+
     /**
       * Operand type checker based on [[ScalarFunction]] given information.
       */
@@ -178,5 +179,4 @@ object ScalarSqlFunction {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
index 6eadfbc..738238d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
@@ -33,7 +33,6 @@ import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
 import scala.collection.JavaConverters._
 import java.util
 
-
 /**
   * Calcite wrapper for user-defined table functions.
   */
@@ -55,31 +54,33 @@ class TableSqlFunction(
     functionImpl) {
 
   /**
-    * Get the user-defined table function
+    * Get the user-defined table function.
     */
   def getTableFunction = udtf
 
   /**
-    * Get the returned table type information of the table function
+    * Get the type information of the table returned by the table function.
     */
   def getRowTypeInfo = rowTypeInfo
 
   /**
     * Get additional mapping information if the returned table type is a POJO
-    * (POJO types have no deterministic field order)
+    * (POJO types have no deterministic field order).
     */
   def getPojoFieldMapping = functionImpl.fieldIndexes
 
 }
 
 object TableSqlFunction {
+
   /**
-    * Util function to create a [[TableSqlFunction]]
+    * Util function to create a [[TableSqlFunction]].
+    *
     * @param name function name (used by SQL parser)
-    * @param udtf user defined table function to be called
+    * @param udtf user-defined table function to be called
     * @param rowTypeInfo the row type information generated by the table function
     * @param typeFactory type factory for converting Flink's between Calcite's types
-    * @param functionImpl calcite table function schema
+    * @param functionImpl Calcite table function schema
     * @return [[TableSqlFunction]]
     */
   def apply(

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
index 932baeb..4899691 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -141,13 +141,17 @@ object UserDefinedFunctionUtils {
       .getDeclaredMethods
       .filter { m =>
         val modifiers = m.getModifiers
-        m.getName == "eval" && Modifier.isPublic(modifiers) && !Modifier.isAbstract(modifiers)
+        m.getName == "eval" &&
+          Modifier.isPublic(modifiers) &&
+          !Modifier.isAbstract(modifiers) &&
+          !(function.isInstanceOf[TableFunction[_]] && Modifier.isStatic(modifiers))
       }
 
     if (methods.isEmpty) {
       throw new ValidationException(
         s"Function class '${function.getClass.getCanonicalName}' does not implement at least " +
-          s"one method named 'eval' which is public and not abstract.")
+          s"one method named 'eval' which is public, not abstract and " +
+          s"(in case of table functions) not static.")
     } else {
       methods
     }
@@ -158,7 +162,7 @@ object UserDefinedFunctionUtils {
   }
 
   // ----------------------------------------------------------------------------------------------
-  // Utilities for sql functions
+  // Utilities for SQL functions
   // ----------------------------------------------------------------------------------------------
 
   /**
@@ -255,7 +259,7 @@ object UserDefinedFunctionUtils {
     * Field names are automatically extracted for
     * [[org.apache.flink.api.common.typeutils.CompositeType]].
     *
-    * @param inputType The TypeInformation extract the field names and positions from.
+    * @param inputType The TypeInformation to extract the field names and positions from.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
   def getFieldInfo(inputType: TypeInformation[_])
@@ -265,8 +269,8 @@ object UserDefinedFunctionUtils {
       case t: CompositeType[_] => t.getFieldNames
       case a: AtomicType[_] => Array("f0")
       case tpe =>
-        throw new TableException(s"Currently only support CompositeType and AtomicType. " +
-                                   s"Type $tpe lacks explicit field naming")
+        throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
+          s"Type $tpe lacks explicit field naming")
     }
     val fieldIndexes = fieldNames.indices.toArray
     val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
@@ -274,7 +278,7 @@ object UserDefinedFunctionUtils {
         case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
         case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
         case tpe =>
-          throw new TableException(s"Currently only support CompositeType and AtomicType.")
+          throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
       }
     }
     (fieldNames, fieldIndexes, fieldTypes)

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
index f6ddeef..c093f1a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -49,9 +49,7 @@ object ProjectionTranslator {
 
     val replaced = exprs
       .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
-      .map {
-        case e: Expression => UnresolvedAlias(e)
-      }
+      .map(UnresolvedAlias)
     val aggs = aggNames.map( a => Alias(a._1, a._2)).toSeq
     val props = propNames.map( p => Alias(p._1, p._2)).toSeq
 

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 4dc2ab7..438698a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -428,7 +428,6 @@ case class Join(
     right.construct(relBuilder)
 
     val corSet = mutable.Set[CorrelationId]()
-
     if (correlated) {
       corSet += relBuilder.peek().getCluster.createCorrel()
     }
@@ -624,9 +623,9 @@ case class WindowAggregate(
   }
 }
 
-
 /**
   * LogicalNode for calling a user-defined table functions.
+  *
   * @param functionName function name
   * @param tableFunction table function to be called (might be overloaded)
   * @param parameters actual parameters
@@ -634,16 +633,16 @@ case class WindowAggregate(
   * @param child child logical node
   */
 case class LogicalTableFunctionCall(
-  functionName: String,
-  tableFunction: TableFunction[_],
-  parameters: Seq[Expression],
-  resultType: TypeInformation[_],
-  fieldNames: Array[String],
-  child: LogicalNode)
+    functionName: String,
+    tableFunction: TableFunction[_],
+    parameters: Seq[Expression],
+    resultType: TypeInformation[_],
+    fieldNames: Array[String],
+    child: LogicalNode)
   extends UnaryNode {
 
-  val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
-  var evalMethod: Method = _
+  private val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
+  private var evalMethod: Method = _
 
   override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
     case (n, t) => ResolvedFieldReference(n, t)
@@ -651,9 +650,9 @@ case class LogicalTableFunctionCall(
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     val node = super.validate(tableEnv).asInstanceOf[LogicalTableFunctionCall]
-    // check not Scala object
+    // check if not Scala object
     checkNotSingleton(tableFunction.getClass)
-    // check could be instantiated
+    // check if class could be instantiated
     checkForInstantiation(tableFunction.getClass)
     // look for a signature that matches the input types
     val signature = node.parameters.map(_.resultType)

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
index 9745be1..93a8f53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction}
 import org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.api.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
 import org.apache.flink.api.table.functions.utils.TableSqlFunction
 import org.apache.flink.api.table.runtime.FlatMapRunner
 import org.apache.flink.api.table.typeutils.TypeConverter._
@@ -73,12 +74,12 @@ trait FlinkCorrelate {
       // outer apply
 
       // in case of outer apply and the returned row of table function is empty,
-      // fill null to all fields of the row
+      // fill all fields of row with null
       val input2NullExprs = input2AccessExprs.map { x =>
         GeneratedExpression(
           primitiveDefaultValue(x.resultType),
-          GeneratedExpression.ALWAYS_NULL,
-          "",
+          ALWAYS_NULL,
+          NO_CODE,
           x.resultType)
       }
       val outerResultExpr = generator.generateResultExpression(

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
index 4aa7fea..3cddf8b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -99,7 +99,7 @@ class DataSetCorrelate(
       config.getNullCheck,
       config.getEfficientTypeUsage)
 
-    // do not need to specify input type
+    // we do not need to specify input type
     val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
index b0bc48a..028cb10 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -48,6 +48,7 @@ class DataStreamCorrelate(
   extends SingleRel(cluster, traitSet, inputNode)
   with FlinkCorrelate
   with DataStreamRel {
+
   override def deriveRowType() = relRowType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
@@ -92,7 +93,7 @@ class DataStreamCorrelate(
       config.getNullCheck,
       config.getEfficientTypeUsage)
 
-    // do not need to specify input type
+    // we do not need to specify input type
     val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
index e6cf0cf..39756be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
@@ -33,8 +33,7 @@ class DataSetCorrelateRule
       classOf[LogicalCorrelate],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "DataSetCorrelateRule")
-  {
+      "DataSetCorrelateRule") {
 
     override def matches(call: RelOptRuleCall): Boolean = {
       val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
@@ -46,7 +45,9 @@ class DataSetCorrelateRule
         case scan: LogicalTableFunctionScan => true
         // a filter is pushed above the table function
         case filter: LogicalFilter =>
-          filter.getInput.asInstanceOf[RelSubset].getOriginal
+          filter
+            .getInput.asInstanceOf[RelSubset]
+            .getOriginal
             .isInstanceOf[LogicalTableFunctionScan]
         case _ => false
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
index bb52fd7..554c6c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -33,8 +33,7 @@ class DataStreamCorrelateRule
     classOf[LogicalCorrelate],
     Convention.NONE,
     DataStreamConvention.INSTANCE,
-    "DataStreamCorrelateRule")
-{
+    "DataStreamCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
@@ -45,7 +44,9 @@ class DataStreamCorrelateRule
       case scan: LogicalTableFunctionScan => true
       // a filter is pushed above the table function
       case filter: LogicalFilter =>
-        filter.getInput.asInstanceOf[RelSubset].getOriginal
+        filter
+          .getInput.asInstanceOf[RelSubset]
+          .getOriginal
           .isInstanceOf[LogicalTableFunctionScan]
       case _ => false
     }
@@ -63,8 +64,9 @@ class DataStreamCorrelateRule
           convertToCorrelate(rel.getRelList.get(0), condition)
 
         case filter: LogicalFilter =>
-          convertToCorrelate(filter.getInput.asInstanceOf[RelSubset].getOriginal,
-                             Some(filter.getCondition))
+          convertToCorrelate(
+            filter.getInput.asInstanceOf[RelSubset].getOriginal,
+            Some(filter.getCondition))
 
         case scan: LogicalTableFunctionScan =>
           new DataStreamCorrelate(

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index a75f2fc..b421c8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -611,9 +611,9 @@ class Table(
   }
 
   /**
-    * The Cross Apply returns rows from the outer table (table on the left of the Apply operator)
-    * that produces matching values from the table-valued function (which is on the right side of
-    * the operator).
+    * The Cross Apply operator returns rows from the outer table (table on the left of the
+    * operator) that produces matching values from the table-valued function (which is defined in
+    * the expression on the right side of the operator).
     *
     * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
     *
@@ -635,23 +635,25 @@ class Table(
   }
 
   /**
-    * The Cross Apply returns rows from the outer table (table on the left of the Apply operator)
-    * that produces matching values from the table-valued function (which is on the right side of
-    * the operator).
+    * The Cross Apply operator returns rows from the outer table (table on the left of the
+    * operator) that produces matching values from the table-valued function (which is defined in
+    * the expression on the right side of the operator).
     *
     * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
     *
     * Example:
     *
     * {{{
-    *   class MySplitUDTF extends TableFunction[String] {
-    *     def eval(str: String): Unit = {
-    *       str.split("#").foreach(collect)
+    *   class MySplitUDTF extends TableFunction<String> {
+    *     public void eval(String str) {
+    *       str.split("#").forEach(this::collect);
     *     }
     *   }
     *
-    *   val split = new MySplitUDTF()
-    *   table.crossApply("split(c) as (s)").select("a, b, c, s")
+    *   TableFunction<String> split = new MySplitUDTF();
+    *   tableEnv.registerFunction("split", split);
+    *
+    *   table.crossApply("split(c) as (s)").select("a, b, c, s");
     * }}}
     */
   def crossApply(udtf: String): Table = {
@@ -659,9 +661,10 @@ class Table(
   }
 
   /**
-    * The Outer Apply returns all the rows from the outer table (table on the left of the Apply
-    * operator), and rows that do not matches the condition from the table-valued function (which
-    * is on the right side of the operator), NULL values are displayed.
+    * The Outer Apply operator returns all the rows from the outer table (table on the left of the
+    * Apply operator), and rows that do not match the condition from the table-valued function
+    * (which is defined in the expression on the right side of the operator).
+    * Rows with no matching condition are filled with null values.
     *
     * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
     *
@@ -683,17 +686,26 @@ class Table(
   }
 
   /**
-    * The Outer Apply returns all the rows from the outer table (table on the left of the Apply
-    * operator), and rows that do not matches the condition from the table-valued function (which
-    * is on the right side of the operator), NULL values are displayed.
+    * The Outer Apply operator returns all the rows from the outer table (table on the left of the
+    * Apply operator), and rows that do not match the condition from the table-valued function
+    * (which is defined in the expression on the right side of the operator).
+    * Rows with no matching condition are filled with null values.
     *
     * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
     *
     * Example:
     *
     * {{{
-    *   val split = new MySplitUDTF()
-    *   table.outerApply("split(c) as (s)").select("a, b, c, s")
+    *   class MySplitUDTF extends TableFunction<String> {
+    *     public void eval(String str) {
+    *       str.split("#").forEach(this::collect);
+    *     }
+    *   }
+    *
+    *   TableFunction<String> split = new MySplitUDTF();
+    *   tableEnv.registerFunction("split", split);
+    *
+    *   table.outerApply("split(c) as (s)").select("a, b, c, s");
     * }}}
     */
   def outerApply(udtf: String): Table = {
@@ -708,7 +720,7 @@ class Table(
   private def applyInternal(udtf: Expression, joinType: JoinType): Table = {
     var alias: Option[Seq[String]] = None
 
-    // unwrap an Expression until get a TableFunctionCall
+    // unwrap an Expression until we get a TableFunctionCall
     def unwrap(expr: Expression): TableFunctionCall = expr match {
       case Alias(child, name, extraNames) =>
         alias = Some(Seq(name) ++ extraNames)
@@ -717,7 +729,9 @@ class Table(
         val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
         unwrap(function)
       case c: TableFunctionCall => c
-      case _ => throw new TableException("Cross/Outer Apply only accept TableFunction")
+      case _ =>
+        throw new TableException(
+          "Cross/Outer Apply operators only accept expressions that define table functions.")
     }
 
     val call = unwrap(udtf)

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 4029a7d..dc68b89 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -47,16 +47,18 @@ class FunctionCatalog {
     sqlFunctions += sqlFunction
   }
 
-  /** Register multiple sql functions at one time. The functions has the same name. **/
+  /**
+    * Register multiple SQL functions at the same time. The functions have the same name.
+    */
   def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
     if (functions.nonEmpty) {
       val name = functions.head.getName
-      // check all name is the same in the functions
+      // check that all functions have the same name
       if (functions.forall(_.getName == name)) {
         sqlFunctions --= sqlFunctions.filter(_.getName == name)
         sqlFunctions ++= functions
       } else {
-        throw ValidationException("The sql functions request to register have different name.")
+        throw ValidationException("The SQL functions to be registered have different names.")
       }
     }
   }
@@ -88,7 +90,7 @@ class FunctionCatalog {
       case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
         val tableSqlFunction = sqlFunctions
           .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
-          .getOrElse(throw ValidationException(s"Unregistered table sql function: $name"))
+          .getOrElse(throw ValidationException(s"Undefined table function: $name"))
           .asInstanceOf[TableSqlFunction]
         val typeInfo = tableSqlFunction.getRowTypeInfo
         val function = tableSqlFunction.getTableFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/log4j-test.properties b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
index 4c74d85..f713aa8 100644
--- a/flink-libraries/flink-table/src/test/resources/log4j-test.properties
+++ b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
deleted file mode 100644
index 7e0d0ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.{Row, Table, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class UserDefinedTableFunctionITCase(
-  mode: TestExecutionMode,
-  configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSQLCrossApply(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    tableEnv.registerTable("MyTable", in)
-    tableEnv.registerFunction("split", new TableFunc1)
-
-    val sqlQuery = "SELECT MyTable.c, t.s FROM MyTable, LATERAL TABLE(split(c)) AS t(s)"
-
-    val result = tableEnv.sql(sqlQuery).toDataSet[Row]
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
-      "Anna#44,Anna\n" + "Anna#44,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSQLOuterApply(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    tableEnv.registerTable("MyTable", in)
-    tableEnv.registerFunction("split", new TableFunc2)
-
-    val sqlQuery = "SELECT MyTable.c, t.a, t.b  FROM MyTable LEFT JOIN LATERAL TABLE(split(c)) " +
-      "AS t(a,b) ON TRUE"
-
-    val result = tableEnv.sql(sqlQuery).toDataSet[Row]
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
-      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableAPICrossApply(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val func1 = new TableFunc1
-    val result = in.crossApply(func1('c) as ('s)).select('c, 's).toDataSet[Row]
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
-      "Anna#44,Anna\n" + "Anna#44,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-
-    // with overloading
-    val result2 = in.crossApply(func1('c, "$") as ('s)).select('c, 's).toDataSet[Row]
-    val results2 = result2.collect()
-    val expected2: String = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
-      "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
-    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
-  }
-
-
-  @Test
-  def testTableAPIOuterApply(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func2 = new TableFunc2
-    val result = in.outerApply(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
-      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testCustomReturnType(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func2 = new TableFunc2
-
-    val result = in
-      .crossApply(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
-      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val hierarchy = new HierarchyTableFunction
-    val result = in
-      .crossApply(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'adult, 'len)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,true,22\n" + "John#19,John,false,19\n" +
-      "Anna#44,Anna,true,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val pojo = new PojoTableFunc()
-    val result = in
-      .crossApply(pojo('c))
-      .select('c, 'name, 'age)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testTableAPIWithFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = in
-      .crossApply(func0('c) as ('name, 'age))
-      .select('c, 'name, 'age)
-      .filter('age > 20)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected: String = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testUDTFWithScalarFunction(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
-    val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func1 = new TableFunc1
-
-    val result = in
-      .crossApply(func1('c.substring(2)) as 's)
-      .select('c, 's)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected: String = "Jack#22,ack\n" + "Jack#22,22\n" + "John#19,ohn\n" + "John#19,19\n" +
-      "Anna#44,nna\n" + "Anna#44,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  private def getSmall3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Jack#22"))
-    data.+=((2, 2L, "John#19"))
-    data.+=((3, 2L, "Anna#44"))
-    data.+=((4, 3L, "nosharp"))
-    env.fromCollection(data)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 7e236d1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.api.table.expressions.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc1, TableFunc2}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.{Row, TableEnvironment, Types}
-import org.junit.Test
-import org.mockito.Mockito._
-
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testTableAPI(): Unit = {
-    // mock
-    val ds = mock(classOf[DataSet[Row]])
-    val jDs = mock(classOf[JDataSet[Row]])
-    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
-    when(ds.javaSet).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
-    javaTableEnv.registerTable("MyTable", in2)
-
-    // test cross apply
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.crossApply(func1('c) as ('s)).select('c, 's)
-    var javaTable = in2.crossApply("func1(c) as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test outer apply
-    scalaTable = in1.outerApply(func1('c) as ('s)).select('c, 's)
-    javaTable = in2.outerApply("func1(c) as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.crossApply(func1('c, "$") as ('s)).select('c, 's)
-    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.crossApply("func2(c) as (name, len)").select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.crossApply("hierarchy(c) as (name, adult, len)")
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.crossApply(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.crossApply("pojo(c)")
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.crossApply("func2(c) as (name, len)")
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.crossApply(func1('c.substring(2)) as ('s))
-      .select('a, 'c, 's)
-    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-
-  @Test
-  def testSQLWithCrossApply(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-
-    // test overloading
-
-    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery2, expected2)
-  }
-
-  @Test
-  def testSQLWithOuterApply(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithCustomType(): Unit = {
-    val util = batchTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithHierarchyType(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
-    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithPojoType(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
-    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "age")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSQLWithFilter(): Unit = {
-    val util = batchTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
-      "WHERE len > 2"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-
-  @Test
-  def testSQLWithScalarFunction(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..1c505ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.api.table.utils._
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testCrossApply(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test overloading
+
+    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+    val expected2 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1($cor0.c, '$')"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery2, expected2)
+  }
+
+  @Test
+  def testOuterApply(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = batchTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new HierarchyTableFunction
+    util.addFunction("hierarchy", function)
+
+    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "hierarchy($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new PojoTableFunc
+    util.addFunction("pojo", function)
+
+    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "pojo($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " INTEGER age, VARCHAR(2147483647) name)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "age")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = batchTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+      "WHERE len > 2"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+}