You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/09/26 08:50:39 UTC

[2/2] flink git commit: [FLINK-7439] [table] Support variable arguments for UDTF in SQL

[FLINK-7439] [table] Support variable arguments for UDTF in SQL

This closes #4536.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79c17afa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79c17afa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79c17afa

Branch: refs/heads/master
Commit: 79c17afa13cd3f6cbaa91d56e036530f53e7b54b
Parents: 142bde0
Author: Jark Wu <ja...@apache.org>
Authored: Mon Aug 14 14:18:52 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 26 10:45:07 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/udfs.md                          |   6 +-
 .../flink/table/api/TableEnvironment.scala      |   4 +-
 .../codegen/calls/ScalarFunctionCallGen.scala   |  25 +--
 .../codegen/calls/TableFunctionCallGen.scala    |  25 +--
 .../table/functions/UserDefinedFunction.scala   |   2 +-
 .../functions/utils/TableSqlFunction.scala      | 153 ++++++++++++-------
 .../utils/UserDefinedFunctionUtils.scala        |  14 +-
 .../flink/table/plan/logical/operators.scala    |   4 +-
 .../plan/rules/logical/LogicalUnnestRule.scala  |   4 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |  15 +-
 .../flink/table/validate/FunctionCatalog.scala  |  16 --
 .../utils/JavaUserDefinedTableFunctions.java    |  17 +++
 .../table/api/batch/sql/CorrelateTest.scala     |  49 ++++++
 .../table/api/stream/sql/CorrelateTest.scala    |  49 ++++++
 .../validation/CorrelateValidationTest.scala    |   2 +-
 15 files changed, 273 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index eef7db6..5b34fe6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -44,7 +44,7 @@ Scalar Functions
 
 If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
 
-In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`.
+In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. Evaluation methods can also support variable arguments, such as `eval(String... strs)`.
 
 The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
 
@@ -139,7 +139,7 @@ Table Functions
 
 Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns. 
 
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
+In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. Evaluation methods can also support variable arguments, such as `eval(String... strs)`. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
 
 In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
 
@@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow the system more effici
 - `merge()` is required for many batch aggreagtions and session window aggregations.
 - `resetAccumulator()` is required for many batch aggregations.
 
-All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. 
+All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. The method `accumulate` can be overloaded with different custom types and arguments and also support variable arguments.
 
 Detailed documentation for all methods of `AggregateFunction` is given below. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 0424cf8..dc82a87 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -355,8 +355,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
-    val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
-    functionCatalog.registerSqlFunctions(sqlFunctions)
+    val sqlFunction = createTableSqlFunction(name, function, typeInfo, typeFactory)
+    functionCatalog.registerSqlFunction(sqlFunction)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 07a8708..6fad573 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.typeutils.TypeCheckUtils
 
+import scala.collection.mutable
+
 /**
   * Generates a call to user-defined [[ScalarFunction]].
   *
@@ -44,21 +46,26 @@ class ScalarFunctionCallGen(
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
     // determine function method and result class
-    val matchingMethod = getUserDefinedMethod(scalarFunction, "eval", typeInfoToClass(signature))
+    val matchingSignature = getEvalMethodSignature(scalarFunction, signature)
       .getOrElse(throw new CodeGenException("No matching signature found."))
-    val matchingSignature = matchingMethod.getParameterTypes
     val resultClass = getResultTypeClassOfScalarFunction(scalarFunction, matchingSignature)
 
-    // zip for variable signatures
-    var paramToOperands = matchingSignature.zip(operands)
-    if (operands.length > matchingSignature.length) {
-      operands.drop(matchingSignature.length).foreach(op =>
-        paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)
-      )
+    // get the expanded parameter types
+    var paramClasses = new mutable.ArrayBuffer[Class[_]]
+    for (i <- operands.indices) {
+      if (i < matchingSignature.length - 1) {
+        paramClasses += matchingSignature(i)
+      } else if (matchingSignature.last.isArray) {
+        // last argument is an array type
+        paramClasses += matchingSignature.last.getComponentType
+      } else {
+        // last argument is not an array type
+        paramClasses += matchingSignature.last
+      }
     }
 
     // convert parameters for function (output boxing)
-    val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
+    val parameters = paramClasses.zip(operands).map { case (paramClass, operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
           } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index a3609c1..e1ad18f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -27,6 +27,8 @@ import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.typeutils.TypeCheckUtils
 
+import scala.collection.mutable
+
 /**
   * Generates a call to user-defined [[TableFunction]].
   *
@@ -45,20 +47,25 @@ class TableFunctionCallGen(
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
     // determine function method
-    val matchingMethod = getUserDefinedMethod(tableFunction, "eval", typeInfoToClass(signature))
+    val matchingSignature = getEvalMethodSignature(tableFunction, signature)
       .getOrElse(throw new CodeGenException("No matching signature found."))
-    val matchingSignature = matchingMethod.getParameterTypes
 
-    // zip for variable signatures
-    var paramToOperands = matchingSignature.zip(operands)
-    if (operands.length > matchingSignature.length) {
-      operands.drop(matchingSignature.length).foreach(op =>
-        paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)
-      )
+    // get the expanded parameter types
+    var paramClasses = new mutable.ArrayBuffer[Class[_]]
+    for (i <- operands.indices) {
+      if (i < matchingSignature.length - 1) {
+        paramClasses += matchingSignature(i)
+      } else if (matchingSignature.last.isArray) {
+        // last argument is an array type
+        paramClasses += matchingSignature.last.getComponentType
+      } else {
+        // last argument is not an array type
+        paramClasses += matchingSignature.last
+      }
     }
 
     // convert parameters for function (output boxing)
-    val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
+    val parameters = paramClasses.zip(operands).map { case (paramClass, operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
           } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index 7c57ea0..b841b31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
   def isDeterministic: Boolean = true
 
   final def functionIdentifier: String = {
-    val md5  =  DigestUtils.md5Hex(serialize(this))
+    val md5 = DigestUtils.md5Hex(serialize(this))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index b37d75b..0bab992 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -18,20 +18,20 @@
 
 package org.apache.flink.table.functions.utils
 
-import com.google.common.base.Predicate
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
-import org.apache.calcite.util.Util
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-
-import scala.collection.JavaConverters._
-import java.util
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.utils.TableSqlFunction._
 
 /**
   * Calcite wrapper for user-defined table functions.
@@ -40,17 +40,14 @@ class TableSqlFunction(
     name: String,
     udtf: TableFunction[_],
     rowTypeInfo: TypeInformation[_],
-    returnTypeInference: SqlReturnTypeInference,
-    operandTypeInference: SqlOperandTypeInference,
-    operandTypeChecker: SqlOperandTypeChecker,
-    paramTypes: util.List[RelDataType],
+    typeFactory: FlinkTypeFactory,
     functionImpl: FlinkTableFunctionImpl[_])
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
-    returnTypeInference,
-    operandTypeInference,
-    operandTypeChecker,
-    paramTypes,
+    ReturnTypes.CURSOR,
+    createOperandTypeInference(name, udtf, typeFactory),
+    createOperandTypeChecker(name, udtf),
+    null,
     functionImpl) {
 
   /**
@@ -74,48 +71,102 @@ class TableSqlFunction(
 
 object TableSqlFunction {
 
-  /**
-    * Util function to create a [[TableSqlFunction]].
-    *
-    * @param name function name (used by SQL parser)
-    * @param udtf user-defined table function to be called
-    * @param rowTypeInfo the row type information generated by the table function
-    * @param typeFactory type factory for converting Flink's between Calcite's types
-    * @param functionImpl Calcite table function schema
-    * @return [[TableSqlFunction]]
-    */
-  def apply(
+  private[flink] def createOperandTypeInference(
     name: String,
     udtf: TableFunction[_],
-    rowTypeInfo: TypeInformation[_],
-    typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
-    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
-    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
-    // derives operands' data types and type families
-    functionImpl.getParameters.asScala.foreach{ o =>
-      val relType: RelDataType = o.getType(typeFactory)
-      argTypes.add(relType)
-      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
+    typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+    /**
+      * Operand type inference based on [[TableFunction]] given information.
+      */
+    new SqlOperandTypeInference {
+      override def inferOperandTypes(
+          callBinding: SqlCallBinding,
+          returnType: RelDataType,
+          operandTypes: Array[RelDataType]): Unit = {
+
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+          .getOrElse(throw new ValidationException(
+            s"Given parameters of function '$name' do not match any signature. \n" +
+              s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+              s"Expected: ${signaturesToString(udtf, "eval")}"))
+
+        val inferredTypes = foundSignature
+          .map(TypeExtractor.getForClass(_))
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
+
+        for (i <- operandTypes.indices) {
+          if (i < inferredTypes.length - 1) {
+            operandTypes(i) = inferredTypes(i)
+          } else if (null != inferredTypes.last.getComponentType) {
+            // last argument is a collection, the array type
+            operandTypes(i) = inferredTypes.last.getComponentType
+          } else {
+            operandTypes(i) = inferredTypes.last
+          }
+        }
+      }
     }
-    // derives whether the 'input'th parameter of a method is optional.
-    val optional: Predicate[Integer] = new Predicate[Integer]() {
-      def apply(input: Integer): Boolean = {
-        functionImpl.getParameters.get(input).isOptional
+  }
+
+  private[flink] def createOperandTypeChecker(
+    name: String,
+    udtf: TableFunction[_])
+  : SqlOperandTypeChecker = {
+
+    val signatures = getMethodSignatures(udtf, "eval")
+
+    /**
+      * Operand type checker based on [[TableFunction]] given information.
+      */
+    new SqlOperandTypeChecker {
+      override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
+        s"$opName[${signaturesToString(udtf, "eval")}]"
+      }
+
+      override def getOperandCountRange: SqlOperandCountRange = {
+        var min = 255
+        var max = -1
+        signatures.foreach( sig => {
+          var len = sig.length
+          if (len > 0 && sig(sig.length - 1).isArray) {
+            max = 254  // according to JVM spec 4.3.3
+            len = sig.length - 1
+          }
+          max = Math.max(len, max)
+          min = Math.min(len, min)
+        })
+        SqlOperandCountRanges.between(min, max)
+      }
+
+      override def checkOperandTypes(
+        callBinding: SqlCallBinding,
+        throwOnFailure: Boolean)
+      : Boolean = {
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+
+        if (foundSignature.isEmpty) {
+          if (throwOnFailure) {
+            throw new ValidationException(
+              s"Given parameters of function '$name' do not match any signature. \n" +
+                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+                s"Expected: ${signaturesToString(udtf, "eval")}")
+          } else {
+            false
+          }
+        } else {
+          true
+        }
       }
+
+      override def isOptional(i: Int): Boolean = false
+
+      override def getConsistency: Consistency = Consistency.NONE
+
     }
-    // create type check for the operands
-    val typeChecker: FamilyOperandTypeChecker = OperandTypes.family(typeFamilies, optional)
-
-    new TableSqlFunction(
-      name,
-      udtf,
-      rowTypeInfo,
-      ReturnTypes.CURSOR,
-      InferTypes.explicit(argTypes),
-      typeChecker,
-      argTypes,
-      functionImpl)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f53bcde..6a90569 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -258,7 +258,7 @@ object UserDefinedFunctionUtils {
   }
 
   /**
-    * Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
+    * Create [[SqlFunction]] for a [[TableFunction]]
     *
     * @param name function name
     * @param tableFunction table function
@@ -266,19 +266,15 @@ object UserDefinedFunctionUtils {
     * @param typeFactory type factory
     * @return the TableSqlFunction
     */
-  def createTableSqlFunctions(
+  def createTableSqlFunction(
       name: String,
       tableFunction: TableFunction[_],
       resultType: TypeInformation[_],
       typeFactory: FlinkTypeFactory)
-    : Seq[SqlFunction] = {
+    : SqlFunction = {
     val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
-    val evalMethods = checkAndExtractMethods(tableFunction, "eval")
-
-    evalMethods.map { method =>
-      val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method)
-      TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
-    }
+    val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames)
+    new TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 795a506..559d20d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -723,10 +723,10 @@ case class LogicalTableFunctionCall(
     val function = new FlinkTableFunctionImpl(
       resultType,
       fieldIndexes,
-      if (fieldNames.isEmpty) generatedNames else fieldNames, evalMethod
+      if (fieldNames.isEmpty) generatedNames else fieldNames
     )
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val sqlFunction = TableSqlFunction(
+    val sqlFunction = new TableSqlFunction(
       tableFunction.functionIdentifier,
       tableFunction,
       resultType,

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index f2d9f2a..802fd85 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -82,11 +82,11 @@ class LogicalUnnestRule(
           val componentType = arrayType.getComponentType
 
           // create table function
-          val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions(
+          val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction(
             "explode",
             ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
             FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
-            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
+            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
 
           // create table function call
           val rexCall = cluster.getRexBuilder.makeCall(

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 27fc2ea..cab8ea9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.table.plan.schema
 
-import java.lang.reflect.{Method, Type}
+import java.lang.reflect.Type
 import java.util
+import java.util.Collections
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.TableFunction
-import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.calcite.schema.{FunctionParameter, TableFunction}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
@@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 class FlinkTableFunctionImpl[T](
     val typeInfo: TypeInformation[T],
     val fieldIndexes: Array[Int],
-    val fieldNames: Array[String],
-    val evalMethod: Method)
-  extends ReflectiveFunctionBase(evalMethod)
-  with TableFunction {
+    val fieldNames: Array[String])
+  extends TableFunction {
 
   if (fieldIndexes.length != fieldNames.length) {
     throw new TableException(
@@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
 
   override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
 
+  // we do never use the FunctionParameters, so return an empty list
+  override def getParameters: util.List[FunctionParameter] = Collections.emptyList()
+
   override def getRowType(typeFactory: RelDataTypeFactory,
                           arguments: util.List[AnyRef]): RelDataType = {
     val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 55dbe4c..5254ceb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -48,22 +48,6 @@ class FunctionCatalog {
     sqlFunctions += sqlFunction
   }
 
-  /**
-    * Register multiple SQL functions at the same time. The functions have the same name.
-    */
-  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
-    if (functions.nonEmpty) {
-      val name = functions.head.getName
-      // check that all functions have the same name
-      if (functions.forall(_.getName == name)) {
-        sqlFunctions --= sqlFunctions.filter(_.getName == name)
-        sqlFunctions ++= functions
-      } else {
-        throw ValidationException("The SQL functions to be registered have different names.")
-      }
-    }
-  }
-
   def getSqlOperatorTable: SqlOperatorTable =
     ChainedSqlOperatorTable.of(
       new BasicOperatorTable(),

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
index cd92f49..2a27add 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -35,4 +35,21 @@ public class JavaUserDefinedTableFunctions {
 			collect(c);
 		}
 	}
+
+	/**
+	 * Emit every input string.
+	 */
+	public static class JavaVarsArgTableFunc0 extends TableFunction<String> {
+		public void eval(String... strs) {
+			for (String s : strs) {
+				collect(s);
+			}
+		}
+
+		public void eval(int val, String str) {
+			for (int i = 0; i < val; i++) {
+				collect(str);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
index a9938cb..719141f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2, _}
 import org.junit.Test
@@ -234,4 +235,52 @@ class CorrelateTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new JavaVarsArgTableFunc0
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 'world', c)) AS T(s)"
+
+    var expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1('hello', 'world', $cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test scala var arg function
+    val func2 = new VarArgsFunc0
+    util.addFunction("func2", func2)
+
+    sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 'world', c)) AS T(s)"
+
+    expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func2('hello', 'world', $cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index 9bb7bcf..955ed4b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2, _}
 import org.junit.Test
@@ -233,4 +234,52 @@ class CorrelateTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new JavaVarsArgTableFunc0
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 'world', c)) AS T(s)"
+
+    var expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1('hello', 'world', $cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test scala var arg function
+    val func2 = new VarArgsFunc0
+    util.addFunction("func2", func2)
+
+    sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 'world', c)) AS T(s)"
+
+    expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2('hello', 'world', $cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index f58feed..66593e9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -173,7 +173,7 @@ class CorrelateValidationTest extends TableTestBase {
     // SQL API call
     expectExceptionThrown(
       util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
-      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
+      "Given parameters of function 'func2' do not match any signature.")
   }
 
   // ----------------------------------------------------------------------------------------------