You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wuchong <gi...@git.apache.org> on 2017/08/14 06:58:43 UTC

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

GitHub user wuchong opened a pull request:

    https://github.com/apache/flink/pull/4536

    [FLINK-7439] [table] Support variable arguments for UDTF in SQL

    ## What is the purpose of the change
    
    Support variable arguments for UDTF in SQL
    
    ## Brief change log
    
    - `TableSqlFunction`'s operand type inference and checker
    - modify `ScalarFunctionCallGen` and `TableFunctionCallGen` which handles var args incorrectly.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - *Added `CorrelateTest.scala` for batch and stream queries to verify logical plan*
    
    The integration test can be covered by existing `CorrelateITCase`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? na
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wuchong/flink udtf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4536
    
----
commit b18ce206ad69e0abf72f5388538bb65e4911c65a
Author: Jark Wu <ja...@apache.org>
Date:   2017-08-14T06:18:52Z

    [FLINK-7439] [table] Support variable arguments for UDTF in SQL

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r140276174
  
    --- Diff: docs/dev/table/udfs.md ---
    @@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow the system more effici
     - `merge()` is required for many batch aggreagtions and session window aggregations.
     - `resetAccumulator()` is required for many batch aggregations.
     
    -All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. 
    --- End diff --
    
    "The method `accumulate` can be overloaded with different custom types and arguments and also support variable arguments."
    
    ->
    
    "The method `accumulate` can be overloaded with different parameter types and supports variable arguments."


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r137221865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -40,17 +40,14 @@ class TableSqlFunction(
         name: String,
         udtf: TableFunction[_],
         rowTypeInfo: TypeInformation[_],
    -    returnTypeInference: SqlReturnTypeInference,
    -    operandTypeInference: SqlOperandTypeInference,
    -    operandTypeChecker: SqlOperandTypeChecker,
    -    paramTypes: util.List[RelDataType],
    +    typeFactory: FlinkTypeFactory,
         functionImpl: FlinkTableFunctionImpl[_])
       extends SqlUserDefinedTableFunction(
         new SqlIdentifier(name, SqlParserPos.ZERO),
    -    returnTypeInference,
    -    operandTypeInference,
    -    operandTypeChecker,
    -    paramTypes,
    +    ReturnTypes.CURSOR,
    +    createOperandTypeInference(name, udtf, typeFactory),
    +    createOperandTypeChecker(name, udtf),
    +    null,
    --- End diff --
    
    Is it safe to pass `null` here? Couldn't we create this parameter from `rowTypeInfo`?


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135425261
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -18,39 +18,40 @@
     
     package org.apache.flink.table.functions.utils
     
    -import com.google.common.base.Predicate
     import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.sql._
     import org.apache.calcite.sql.`type`._
     import org.apache.calcite.sql.parser.SqlParserPos
     import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
    -import org.apache.calcite.util.Util
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.table.calcite.FlinkTypeFactory
    -import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
    --- End diff --
    
    remove useless import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r136217663
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
     class FlinkTableFunctionImpl[T](
         val typeInfo: TypeInformation[T],
         val fieldIndexes: Array[Int],
    -    val fieldNames: Array[String],
    -    val evalMethod: Method)
    -  extends ReflectiveFunctionBase(evalMethod)
    --- End diff --
    
    @wuchong I got you meaning, I'am find with your approach. And in order to find a better way, @fhueske and @twalthr I am appreciate if  you can give us some advices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4536
  
    Thanks @wuchong. I will go over the changes a last time and merger this...


---

[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4536
  
    Thanks for the updating, looks good to me.
    +1 to merged.


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r140163581
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -40,17 +40,14 @@ class TableSqlFunction(
         name: String,
         udtf: TableFunction[_],
         rowTypeInfo: TypeInformation[_],
    -    returnTypeInference: SqlReturnTypeInference,
    -    operandTypeInference: SqlOperandTypeInference,
    -    operandTypeChecker: SqlOperandTypeChecker,
    -    paramTypes: util.List[RelDataType],
    +    typeFactory: FlinkTypeFactory,
         functionImpl: FlinkTableFunctionImpl[_])
       extends SqlUserDefinedTableFunction(
         new SqlIdentifier(name, SqlParserPos.ZERO),
    -    returnTypeInference,
    -    operandTypeInference,
    -    operandTypeChecker,
    -    paramTypes,
    +    ReturnTypes.CURSOR,
    +    createOperandTypeInference(name, udtf, typeFactory),
    +    createOperandTypeChecker(name, udtf),
    +    null,
    --- End diff --
    
    I think it is fine. `ScalarSqlFunction` and `AggregateSqlFunction` also pass `null` and works good. 


---

[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4536
  
    @sunjincheng121  @fhueske  I would be great if you can have a look ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135430307
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
     
       override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
     
    +  // we do never use the FunctionParameters, so return an empty list
    +  override def getParameters: util.List[FunctionParameter] = Collections.emptyList()
    --- End diff --
    
    `ReflectiveFunctionBase#getParameters` is used to create `SqlOperandTypeChecker` in the previous implementation. But with the new implementation, we don't need the `ReflectiveFunctionBase#getParameters` to create `SqlOperandTypeChecker`. So I think we can just return an empty list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4536


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135425326
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -18,39 +18,40 @@
     
     package org.apache.flink.table.functions.utils
     
    -import com.google.common.base.Predicate
     import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.sql._
     import org.apache.calcite.sql.`type`._
     import org.apache.calcite.sql.parser.SqlParserPos
     import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
    -import org.apache.calcite.util.Util
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.table.calcite.FlinkTypeFactory
    -import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
     import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
     
     import scala.collection.JavaConverters._
     import java.util
     
    +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    --- End diff --
    
    I suggest that format the import order, i.e.: 
    **calcite**
    **calcite**
    **flink.api**
    **flink.api**
    **flink.table**
    **flink.talbe**
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4536
  
    Hi @sunjincheng121  I have updated the PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r136287683
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -74,48 +75,102 @@ class TableSqlFunction(
     
     object TableSqlFunction {
     
    -  /**
    -    * Util function to create a [[TableSqlFunction]].
    -    *
    -    * @param name function name (used by SQL parser)
    -    * @param udtf user-defined table function to be called
    -    * @param rowTypeInfo the row type information generated by the table function
    -    * @param typeFactory type factory for converting Flink's between Calcite's types
    -    * @param functionImpl Calcite table function schema
    -    * @return [[TableSqlFunction]]
    -    */
    -  def apply(
    +  private[flink] def createOperandTypeInference(
         name: String,
         udtf: TableFunction[_],
    -    rowTypeInfo: TypeInformation[_],
    -    typeFactory: FlinkTypeFactory,
    -    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
    -
    -    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
    -    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
    -    // derives operands' data types and type families
    -    functionImpl.getParameters.asScala.foreach{ o =>
    -      val relType: RelDataType = o.getType(typeFactory)
    -      argTypes.add(relType)
    -      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
    +    typeFactory: FlinkTypeFactory)
    +  : SqlOperandTypeInference = {
    +    /**
    +      * Operand type inference based on [[TableFunction]] given information.
    +      */
    +    new SqlOperandTypeInference {
    +      override def inferOperandTypes(
    +          callBinding: SqlCallBinding,
    +          returnType: RelDataType,
    +          operandTypes: Array[RelDataType]): Unit = {
    +
    +        val operandTypeInfo = getOperandTypeInfo(callBinding)
    +
    +        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
    +          .getOrElse(throw new ValidationException(
    +            s"Given parameters of function '$name' do not match any signature. \n" +
    +              s"Actual: ${signatureToString(operandTypeInfo)} \n" +
    +              s"Expected: ${signaturesToString(udtf, "eval")}"))
    +
    +        val inferredTypes = foundSignature
    +          .map(TypeExtractor.getForClass(_))
    +          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
    +
    +        for (i <- operandTypes.indices) {
    +          if (i < inferredTypes.length - 1) {
    +            operandTypes(i) = inferredTypes(i)
    +          } else if (null != inferredTypes.last.getComponentType) {
    +            // last argument is a collection, the array type
    +            operandTypes(i) = inferredTypes.last.getComponentType
    +          } else {
    +            operandTypes(i) = inferredTypes.last
    +          }
    +        }
    +      }
         }
    -    // derives whether the 'input'th parameter of a method is optional.
    -    val optional: Predicate[Integer] = new Predicate[Integer]() {
    -      def apply(input: Integer): Boolean = {
    -        functionImpl.getParameters.get(input).isOptional
    +  }
    +
    +  private[flink] def createOperandTypeChecker(
    --- End diff --
    
    Yes, I agree with you. I tried to merge these code but failed for some test cases. I didn't figure it out. But I think we can do it in another issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r140986388
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
     class FlinkTableFunctionImpl[T](
         val typeInfo: TypeInformation[T],
         val fieldIndexes: Array[Int],
    -    val fieldNames: Array[String],
    -    val evalMethod: Method)
    -  extends ReflectiveFunctionBase(evalMethod)
    --- End diff --
    
    I think we don't need `ReflectiveFunctionBase` anymore. The logic is implemented by us now.


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135425816
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -74,48 +75,102 @@ class TableSqlFunction(
     
     object TableSqlFunction {
     
    -  /**
    -    * Util function to create a [[TableSqlFunction]].
    -    *
    -    * @param name function name (used by SQL parser)
    -    * @param udtf user-defined table function to be called
    -    * @param rowTypeInfo the row type information generated by the table function
    -    * @param typeFactory type factory for converting Flink's between Calcite's types
    -    * @param functionImpl Calcite table function schema
    -    * @return [[TableSqlFunction]]
    -    */
    -  def apply(
    +  private[flink] def createOperandTypeInference(
         name: String,
         udtf: TableFunction[_],
    -    rowTypeInfo: TypeInformation[_],
    -    typeFactory: FlinkTypeFactory,
    -    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
    -
    -    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
    -    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
    -    // derives operands' data types and type families
    -    functionImpl.getParameters.asScala.foreach{ o =>
    -      val relType: RelDataType = o.getType(typeFactory)
    -      argTypes.add(relType)
    -      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
    +    typeFactory: FlinkTypeFactory)
    +  : SqlOperandTypeInference = {
    +    /**
    +      * Operand type inference based on [[TableFunction]] given information.
    +      */
    +    new SqlOperandTypeInference {
    +      override def inferOperandTypes(
    +          callBinding: SqlCallBinding,
    +          returnType: RelDataType,
    +          operandTypes: Array[RelDataType]): Unit = {
    +
    +        val operandTypeInfo = getOperandTypeInfo(callBinding)
    +
    +        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
    +          .getOrElse(throw new ValidationException(
    +            s"Given parameters of function '$name' do not match any signature. \n" +
    +              s"Actual: ${signatureToString(operandTypeInfo)} \n" +
    +              s"Expected: ${signaturesToString(udtf, "eval")}"))
    +
    +        val inferredTypes = foundSignature
    +          .map(TypeExtractor.getForClass(_))
    +          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
    +
    +        for (i <- operandTypes.indices) {
    +          if (i < inferredTypes.length - 1) {
    +            operandTypes(i) = inferredTypes(i)
    +          } else if (null != inferredTypes.last.getComponentType) {
    +            // last argument is a collection, the array type
    +            operandTypes(i) = inferredTypes.last.getComponentType
    +          } else {
    +            operandTypes(i) = inferredTypes.last
    +          }
    +        }
    +      }
         }
    -    // derives whether the 'input'th parameter of a method is optional.
    -    val optional: Predicate[Integer] = new Predicate[Integer]() {
    -      def apply(input: Integer): Boolean = {
    -        functionImpl.getParameters.get(input).isOptional
    +  }
    +
    +  private[flink] def createOperandTypeChecker(
    --- End diff --
    
    Can we share the methods of  `SqlOperandTypeInference` and `createOperandTypeChecker` with `ScalarSqlFunction``TableSqlFunction` and `AggSqlFunction`, Because all of the UDX need them, and the logic of these method are  same or similar.  May be we can move them into `UserDefinedFunctionUtils`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4536
  
    Hi @fhueske , I have fixed the conflicts and update the docs to describe the var-arg support.


---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135426183
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
     
       override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
     
    +  // we do never use the FunctionParameters, so return an empty list
    +  override def getParameters: util.List[FunctionParameter] = Collections.emptyList()
    --- End diff --
    
    I think we can keep using `ReflectiveFunctionBase#getParameters`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135425060
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -350,8 +350,8 @@ abstract class TableEnvironment(val config: TableConfig) {
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
    -    functionCatalog.registerSqlFunctions(sqlFunctions)
    +    val sqlFunctions = createTableSqlFunction(name, function, typeInfo, typeFactory)
    --- End diff --
    
    sqlFunctions -> sqlFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135426134
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
     class FlinkTableFunctionImpl[T](
         val typeInfo: TypeInformation[T],
         val fieldIndexes: Array[Int],
    -    val fieldNames: Array[String],
    -    val evalMethod: Method)
    -  extends ReflectiveFunctionBase(evalMethod)
    --- End diff --
    
    I suggest keep extends `ReflectiveFunctionBase` for keep consistent with calcite. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135430161
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala ---
    @@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
     class FlinkTableFunctionImpl[T](
         val typeInfo: TypeInformation[T],
         val fieldIndexes: Array[Int],
    -    val fieldNames: Array[String],
    -    val evalMethod: Method)
    -  extends ReflectiveFunctionBase(evalMethod)
    --- End diff --
    
    In the first implementation of UDTF, we use `ReflectiveFunctionBase` to infer the operand types. But with the new approach in this PR, we customize a `SqlOperandTypeChecker` to check operands which means we don't need the `ReflectiveFunctionBase` anymore. If we extends `ReflectiveFunctionBase` we have to register every `eval` method as a UDTF. But with the new approach in this PR, we only need to register once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135425277
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---
    @@ -18,39 +18,40 @@
     
     package org.apache.flink.table.functions.utils
     
    -import com.google.common.base.Predicate
     import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.sql._
     import org.apache.calcite.sql.`type`._
     import org.apache.calcite.sql.parser.SqlParserPos
     import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
    -import org.apache.calcite.util.Util
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.table.calcite.FlinkTypeFactory
    -import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
     import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
     
     import scala.collection.JavaConverters._
     import java.util
    --- End diff --
    
    remove useless import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r135426250
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java ---
    @@ -35,4 +35,15 @@ public void eval(Integer a, Long b, Long c) {
     			collect(c);
     		}
     	}
    +
    +	/**
    +	 * Emit every input string.
    +	 */
    +	public static class JavaVarsArgTableFunc0 extends TableFunction<String> {
    +		public void eval(String... strs) {
    +			for (String s : strs) {
    +				collect(s);
    +			}
    +		}
    --- End diff --
    
    Add an other eval for method match check.(only a suggestion).
    {code}
    public void eval(int ival, String sVal) {
    			while (ival-- > 0) {
    				collect(sVal);
    			}
    		}
    {code}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---