You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wuchong <gi...@git.apache.org> on 2017/06/12 04:31:40 UTC

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

GitHub user wuchong opened a pull request:

    https://github.com/apache/flink/pull/4105

    [FLINK-6888] [table] Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala class

    
    Currently the `ACC TypeInformation of `org.apache.flink.table.functions.AggregateFunction[T, ACC]` is extracted using `TypeInformation.of(Class)`. When `ACC` is a Scala case class or tuple class, the TypeInformation will fall back to `GenericType` which result in bad performance when state de/serialization. 
    
    This PR fix this issue by extracting ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.
    
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wuchong/flink acctype

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4105
    
----
commit 43fe33c8ed25699e720d5477a25cad523529bc1e
Author: Jark Wu <wu...@alibaba-inc.com>
Date:   2017-06-12T04:28:46Z

    [FLINK-6888] [table] Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala case/tuple class

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122448432
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    To be honest, I don't think it is a nice approach to add the acc type to the internal representation of UDAGGs. It is not related to the logic of the function or required for the optimization but rather an internal runtime aspect.
    
    An alternative could be to wrap the functions registered from Scala in a class that extends the `AggregateFunction` interface and holds the original agg function and the acc type information (exposed via the getAccumulatorType()` method). We would need to unwrap it before we translate it. It's not a super nice solution either, but would probably require fewer changes.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122964903
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    @fhueske thanks for your explanation. I tried to wrap the UDAGG in an `AggregateFunctionWrapper`, but find that it is not an easy way. Because I can't override the user-defined contract methods, such as `accumulate`, `retract`, `merge`. And in code generation, we generate the acc type class depend on `createAccumulator` method return type, but the return type of `createAccumulator` of `AggregateFunctionWrapper` can only be `Any` which will result in error.
    
    In addition, I plan to support composite result type for UDAGG. This also need the return type not only to be known to Calcite for semantic validation but also to be known for code generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    I have rebased the code and updated.  Would you like to have a look again? @fhueske  @twalthr @sunjincheng121 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121325300
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala ---
    @@ -39,4 +44,34 @@ class AggregationsTest extends TableTestBase {
     
         streamUtil.tEnv.sql(sqlQuery)
       }
    +
    +  @Test
    +  def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
    +    streamUtil.addFunction("udag", new MyAgg)
    +    val call = streamUtil
    +      .tEnv
    +      .functionCatalog
    +      .lookupFunction("udag", Seq())
    +      .asInstanceOf[AggFunctionCall]
    +
    +    val typeInfo = call.accTypeInfo
    +    assertTrue(typeInfo.isInstanceOf[CaseClassTypeInfo[_]])
    +    assertEquals(2, typeInfo.getTotalFields)
    +    val caseTypeInfo = typeInfo.asInstanceOf[CaseClassTypeInfo[_]]
    +    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(0))
    +    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(1))
    +  }
    +}
    +
    +case class Accumulator(sum: Long, count: Long)
    --- End diff --
    
    The members of `Accumulator` must be modified. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121335993
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    --- End diff --
    
    Good idea. I like it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r126706300
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java ---
    @@ -59,7 +59,7 @@ public boolean requiresOver() {
     	/**
     	 * Accumulator for WeightedAvg.
     	 */
    -	public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
    --- End diff --
    
    Why do you change the type of this accumulator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r126883954
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java ---
    @@ -59,7 +59,7 @@ public boolean requiresOver() {
     	/**
     	 * Accumulator for WeightedAvg.
     	 */
    -	public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
    --- End diff --
    
    Yes, you are right. Either we change the type or we touch the code and use `acc.f0` instead of `acc.sum` and `acc.f1` instead of `acc.count`.
    
    Let's go with your approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    Thanks @StephanEwen .  This PR is aiming to fix the issues you mentioned: it used `TypeExtractor` for Scala code. 
    
    @sunjincheng121  I addressed all your comments. Could you please have a look again ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by hustfxj <gi...@git.apache.org>.
Github user hustfxj commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121624773
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    +    extractedType: TypeInformation[_] = null)
    +  : TypeInformation[_] = {
    +
    +    val accType = try {
    +      val method: Method = aggregateFunction.getClass.getMethod("getAccumulatorType")
    +      method.invoke(aggregateFunction).asInstanceOf[TypeInformation[_]]
    +    } catch {
    +      case _: NoSuchMethodException => null
    +      case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    +    }
    +    if (accType != null) {
    +      accType
    +    } else if (extractedType != null) {
    +      extractedType
    +    } else {
    +      val accumulator = aggregateFunction.createAccumulator()
    +      try {
    +        TypeInformation.of(accumulator.getClass)
    --- End diff --
    
    Maybe we use the followed code replace **TypeInformation.of** ?
     ```
    val accumulator = TypeExtractor
            .createTypeInfo(aggregateFunction, classOf[AggregateFunction[_, _]],
              aggregateFunction.getClass, 1).asInstanceOf[TypeInformation[_]]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    Is the code ever called from Java code, or is that purely within the Scala code base?
    
    Just FYI, the design principles in the DataStream and DataSet API are:
      - Never run `TypeExtractor` for Scala code. Scala code always uses implicits and context bounds for the Type Information
      - Java code entry methods uses the Type Extractor
      - Code that is shared between Java / Scala always gets the Type Information passed in


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121335927
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1395,50 +1399,25 @@ object AggregateUtil {
     
             case udagg: AggSqlFunction =>
               aggregates(index) = udagg.getFunction
    +          accTypes(index) = udagg.accType
     
             case unSupported: SqlAggFunction =>
               throw new TableException(s"unsupported Function: '${unSupported.getName}'")
           }
         }
     
    -    (aggFieldIndexes, aggregates)
    -  }
    -
    -  private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_, _]]): Seq[TypeInformation[_]] = {
    -
    -    val aggTypes: Seq[TypeInformation[_]] =
    -      aggregates.map {
    -        agg =>
    -          val accType = try {
    -            val method: Method = agg.getClass.getMethod("getAccumulatorType")
    -            method.invoke(agg).asInstanceOf[TypeInformation[_]]
    -          } catch {
    -            case _: NoSuchMethodException => null
    -            case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    -          }
    -          if (accType != null) {
    -            accType
    -          } else {
    -            val accumulator = agg.createAccumulator()
    -            try {
    -              TypeInformation.of(accumulator.getClass)
    -            } catch {
    -              case ite: InvalidTypesException =>
    -                throw new TableException(
    -                  "Cannot infer type of accumulator. " +
    -                    "You can override AggregateFunction.getAccumulatorType() to specify the type.",
    -                  ite)
    -            }
    -          }
    -      }
    +    // create accumulator type information for every aggregate function
    +    aggregates.zipWithIndex.foreach { case (agg, index) =>
    +      accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, accTypes(index))
    --- End diff --
    
    I don't think so. The `getAccumulatorTypeOfAggregateFunction` can handle the nullable `accTypes(index)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    merging... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121324222
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala ---
    @@ -37,24 +36,26 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
       * @param name function name (used by SQL parser)
       * @param aggregateFunction aggregate function to be called
       * @param returnType the type information of returned value
    +  * @param accType the type information of the accumulator
       * @param typeFactory type factory for converting Flink's between Calcite's types
       */
     class AggSqlFunction(
         name: String,
         aggregateFunction: AggregateFunction[_, _],
    -    returnType: TypeInformation[_],
    +    val returnType: TypeInformation[_],
    +    val accType: TypeInformation[_],
         typeFactory: FlinkTypeFactory,
         requiresOver: Boolean)
    -  extends SqlUserDefinedAggFunction(
    +  extends SqlAggFunction(
    --- End diff --
    
    Why we need change `SqlUserDefinedAggFunction` to `SqlAggFunction`. Is there some reasons? If so, please explain more about. 
    If not so, I suggest keeping using `SqlUserDefinedAggFunction`. Because `SqlUserDefinedAggFunction` is provided by Calcite which related the UDAG, in order to adapt to Calcite changes I recommend keeping using `SqlUserDefinedAggFunction`. (Although the `SqlUserDefinedAggFunction` of calcite is covered by the current flink.). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124575144
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -286,13 +286,21 @@ object UserDefinedFunctionUtils {
       def createAggregateSqlFunction(
           name: String,
           aggFunction: AggregateFunction[_, _],
    -      typeInfo: TypeInformation[_],
    +      extractedResultTypeInfo: TypeInformation[_],
    +      accTypeInfo: TypeInformation[_],
           typeFactory: FlinkTypeFactory)
       : SqlFunction = {
         //check if a qualified accumulate method exists before create Sql function
         checkAndExtractMethods(aggFunction, "accumulate")
    -    val resultType: TypeInformation[_] = getResultTypeOfAggregateFunction(aggFunction, typeInfo)
    -    AggSqlFunction(name, aggFunction, resultType, typeFactory, aggFunction.requiresOver)
    +    val resultType = getResultTypeOfAggregateFunction(aggFunction, extractedResultTypeInfo)
    --- End diff --
    
    `getResultTypeOfAggregateFunction` was called before in `TableEnvironment`. Can we remove it here? If not, we also might want to double check the acc type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124578206
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -320,12 +348,20 @@ object UserDefinedFunctionUtils {
         } else if(extractedType != null) {
           extractedType
         } else {
    -      TypeExtractor
    +      try {
    +        TypeExtractor
    --- End diff --
    
    The fallback type extraction of the accumulator type is different now. Before, we created an accumulator instance and called `TypeInformation.of(Any)`. Now, we try to extract it from the generics of the UDAGG.
    
    @twalthr, does this method work as reliable as `TypeInformation.of(Any)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121399188
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    --- End diff --
    
    I find that only the following code block can be extracted. I think maybe we don't need to extract it. What do you think? @sunjincheng121 
    
    ```
      val resultType = try {
          val method: Method = aggregateFunction.getClass.getMethod(methodName)
          method.invoke(aggregateFunction).asInstanceOf[TypeInformation[_]]
        } catch {
          case _: NoSuchMethodException => null
          case ite: Throwable => throw new TableException("Unexpected exception:", ite)
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122711347
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    You are right. We use the same approach for the return type `T` of all UDFs. However, the return type is part of the function signature and needs to be known to Calcite for semantic validation. The `ACC` type is only needed for compilation and an engine specific property.  
    
    TBH, I'm not sure which approach is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122447088
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -307,9 +315,29 @@ object UserDefinedFunctionUtils {
           aggregateFunction: AggregateFunction[_, _],
           extractedType: TypeInformation[_] = null)
         : TypeInformation[_] = {
    +    getGenericTypeOfAggregateFunction(aggregateFunction, "getResultType", 0, extractedType)
    +  }
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    +    extractedType: TypeInformation[_] = null)
    +  : TypeInformation[_] = {
    +    getGenericTypeOfAggregateFunction(aggregateFunction, "getAccumulatorType", 1, extractedType)
    +  }
    +
    +  private def getGenericTypeOfAggregateFunction(
    --- End diff --
    
    I would not call the method `getGenericTypeOfAggFunction` because this implies that it returns a `GenericType` which we want to avoid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124570771
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
     import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
     import org.apache.flink.table.expressions.ExpressionParser
     import org.apache.flink.table.api._
    +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
    --- End diff --
    
    import is unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124704209
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -320,12 +348,20 @@ object UserDefinedFunctionUtils {
         } else if(extractedType != null) {
           extractedType
         } else {
    -      TypeExtractor
    +      try {
    +        TypeExtractor
    --- End diff --
    
    I think it is more safe than `TypeInformation.of(Any)`, because `TypeInformation.of(Any)` can only works for non-generic types. 
    
    For example: 
    
    ```
    public class UDAF extends AggregateFunction[Tuple2<String, Long>, Long] {
        public Long createAccumulator() {...}
        public Tuple2<String, Long> getValue(Long acc) {...}
    }
    ```
    For the given UDAF, the return type is a generic type `Tuple2<String, Long>`. We can't extract the return type  from an instance of Tuple2, because of Java type erasure. But we can extract it from the type hierarchy of UDAF class. 
    
    Am I right, @twalthr ?
    
    
    
    ```
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    Comments addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122613823
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    Hi @fhueske , I think the ACC type extraction is similar to `T` type of `AggreateFunction[T, ACC]` and `T` type of `TableFunction[T]`. Currently, they are both extracted and add to the internal representation of UDAGGs and UDTFs. To keep the consistent, I followed the same way to add the acc type. If we decide to adopt the approach that wrapping the registered functions, it would be better to make the other type extractions follow this approach.
    
    What do you think ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122614563
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -192,10 +193,14 @@ class BatchTableEnvironment(
           name: String,
           f: AggregateFunction[T, ACC])
       : Unit = {
    -    implicit val typeInfo: TypeInformation[T] = TypeExtractor
    -      .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0)
    +    implicit val typeInfo: TypeInformation[T] = UserDefinedFunctionUtils
    +      .getResultTypeOfAggregateFunction(f)
           .asInstanceOf[TypeInformation[T]]
     
    +    implicit val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionUtils
    --- End diff --
    
    I agree with you. But this PR is only to fix Scala classes can't work in the Scala TableEnvironments. In general, users create Scala classes and naturally will use it in Scala TableEnvironments. Registering a Scala class into Java TableEnvironments is not recommended (should forbidden). This is the same with that Scala `T` classes of `Aggregate[T, _] and TableFunction[T]` works in Scala TableEnvironments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r126865636
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java ---
    @@ -59,7 +59,7 @@ public boolean requiresOver() {
     	/**
     	 * Accumulator for WeightedAvg.
     	 */
    -	public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
    --- End diff --
    
    TypeExtractor will recognize `WeightedAvgAccum` as a `TupleType` which will ignore the `sum` and `count` fields when de/serialize. So I changed it as a POJO class. This should not be included in this PR. But I fixed it when I find it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124574280
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---
    @@ -228,12 +228,16 @@ case class VarSamp(child: Expression) extends Aggregation {
     
     case class AggFunctionCall(
         aggregateFunction: AggregateFunction[_, _],
    +    resultTypeInfo: TypeInformation[_],
    +    accTypeInfo: TypeInformation[_],
         args: Seq[Expression])
       extends Aggregation {
     
       override private[flink] def children: Seq[Expression] = args
     
    -  override def resultType: TypeInformation[_] = getResultTypeOfAggregateFunction(aggregateFunction)
    +  override def resultType: TypeInformation[_] = getResultTypeOfAggregateFunction(
    --- End diff --
    
    should we call `getResultTypeOfAggregateFunction` directly in `UDAGGExpression.apply()`? 
    Otherwise, we spread the inference of the function types to several places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121346261
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1395,50 +1399,25 @@ object AggregateUtil {
     
             case udagg: AggSqlFunction =>
               aggregates(index) = udagg.getFunction
    +          accTypes(index) = udagg.accType
     
             case unSupported: SqlAggFunction =>
               throw new TableException(s"unsupported Function: '${unSupported.getName}'")
           }
         }
     
    -    (aggFieldIndexes, aggregates)
    -  }
    -
    -  private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_, _]]): Seq[TypeInformation[_]] = {
    -
    -    val aggTypes: Seq[TypeInformation[_]] =
    -      aggregates.map {
    -        agg =>
    -          val accType = try {
    -            val method: Method = agg.getClass.getMethod("getAccumulatorType")
    -            method.invoke(agg).asInstanceOf[TypeInformation[_]]
    -          } catch {
    -            case _: NoSuchMethodException => null
    -            case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    -          }
    -          if (accType != null) {
    -            accType
    -          } else {
    -            val accumulator = agg.createAccumulator()
    -            try {
    -              TypeInformation.of(accumulator.getClass)
    -            } catch {
    -              case ite: InvalidTypesException =>
    -                throw new TableException(
    -                  "Cannot infer type of accumulator. " +
    -                    "You can override AggregateFunction.getAccumulatorType() to specify the type.",
    -                  ite)
    -            }
    -          }
    -      }
    +    // create accumulator type information for every aggregate function
    +    aggregates.zipWithIndex.foreach { case (agg, index) =>
    +      accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, accTypes(index))
    --- End diff --
    
    If i understand correctly. only build-in AGG need call `getAccumulatorTypeOfAggregateFunction`. 
    and all the UDAGG do not need call this method. is that correct? If so, I think if only `null==accTypes(index)` need call this method. Please explain more If I understand incorrectly. thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121349658
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1395,50 +1399,25 @@ object AggregateUtil {
     
             case udagg: AggSqlFunction =>
               aggregates(index) = udagg.getFunction
    +          accTypes(index) = udagg.accType
     
             case unSupported: SqlAggFunction =>
               throw new TableException(s"unsupported Function: '${unSupported.getName}'")
           }
         }
     
    -    (aggFieldIndexes, aggregates)
    -  }
    -
    -  private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_, _]]): Seq[TypeInformation[_]] = {
    -
    -    val aggTypes: Seq[TypeInformation[_]] =
    -      aggregates.map {
    -        agg =>
    -          val accType = try {
    -            val method: Method = agg.getClass.getMethod("getAccumulatorType")
    -            method.invoke(agg).asInstanceOf[TypeInformation[_]]
    -          } catch {
    -            case _: NoSuchMethodException => null
    -            case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    -          }
    -          if (accType != null) {
    -            accType
    -          } else {
    -            val accumulator = agg.createAccumulator()
    -            try {
    -              TypeInformation.of(accumulator.getClass)
    -            } catch {
    -              case ite: InvalidTypesException =>
    -                throw new TableException(
    -                  "Cannot infer type of accumulator. " +
    -                    "You can override AggregateFunction.getAccumulatorType() to specify the type.",
    -                  ite)
    -            }
    -          }
    -      }
    +    // create accumulator type information for every aggregate function
    +    aggregates.zipWithIndex.foreach { case (agg, index) =>
    +      accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, accTypes(index))
    --- End diff --
    
    Yes, adding the `null == accTypes(index)` condition can reduce one more time calling `getAccumulatorTypeOfAggregateFunction`.  I will add it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121336133
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala ---
    @@ -37,24 +36,26 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
       * @param name function name (used by SQL parser)
       * @param aggregateFunction aggregate function to be called
       * @param returnType the type information of returned value
    +  * @param accType the type information of the accumulator
       * @param typeFactory type factory for converting Flink's between Calcite's types
       */
     class AggSqlFunction(
         name: String,
         aggregateFunction: AggregateFunction[_, _],
    -    returnType: TypeInformation[_],
    +    val returnType: TypeInformation[_],
    +    val accType: TypeInformation[_],
         typeFactory: FlinkTypeFactory,
         requiresOver: Boolean)
    -  extends SqlUserDefinedAggFunction(
    +  extends SqlAggFunction(
    --- End diff --
    
    Will revert these change 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121321436
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    --- End diff --
    
    Can we abstract a method according ` getAccumulatorTypeOfAggregateFunction` and `getResultTypeOfAggregateFunction`. just like follows:
    
    ```
    def getTypeByMethodOrTypeExtractor(
          aggregateFunction: AggregateFunction[_, _],
          clazz: Class[_],
          methodName: String,
          candidateType: TypeInformation[_] = null): TypeInformation[_] = {
    
        val resultType = try {
          val method: Method = aggregateFunction.getClass.getMethod(methodName)
          method.invoke(aggregateFunction).asInstanceOf[TypeInformation[_]]
        } catch {
          case _: NoSuchMethodException => null
          case ite: Throwable => throw new TableException("Unexpected exception:", ite)
        }
    
        if (resultType != null) {
          resultType
        } else if(candidateType != null) {
          candidateType
        } else {
          try {
            TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[_]]
          } catch {
            case ite: InvalidTypesException =>
              throw new TableException(
                s"Cannot infer type of $clazz" +
                s"You can override AggregateFunction.$methodName to specify the type.", ite)
          }
        }
      }
    ```
    I'm not sure whether it is the best way, Please let me know what you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121329856
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1395,50 +1399,25 @@ object AggregateUtil {
     
             case udagg: AggSqlFunction =>
               aggregates(index) = udagg.getFunction
    +          accTypes(index) = udagg.accType
     
             case unSupported: SqlAggFunction =>
               throw new TableException(s"unsupported Function: '${unSupported.getName}'")
           }
         }
     
    -    (aggFieldIndexes, aggregates)
    -  }
    -
    -  private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_, _]]): Seq[TypeInformation[_]] = {
    -
    -    val aggTypes: Seq[TypeInformation[_]] =
    -      aggregates.map {
    -        agg =>
    -          val accType = try {
    -            val method: Method = agg.getClass.getMethod("getAccumulatorType")
    -            method.invoke(agg).asInstanceOf[TypeInformation[_]]
    -          } catch {
    -            case _: NoSuchMethodException => null
    -            case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    -          }
    -          if (accType != null) {
    -            accType
    -          } else {
    -            val accumulator = agg.createAccumulator()
    -            try {
    -              TypeInformation.of(accumulator.getClass)
    -            } catch {
    -              case ite: InvalidTypesException =>
    -                throw new TableException(
    -                  "Cannot infer type of accumulator. " +
    -                    "You can override AggregateFunction.getAccumulatorType() to specify the type.",
    -                  ite)
    -            }
    -          }
    -      }
    +    // create accumulator type information for every aggregate function
    +    aggregates.zipWithIndex.foreach { case (agg, index) =>
    +      accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, accTypes(index))
    --- End diff --
    
    I think we need add a check as follows:
    ```
    aggregates.zipWithIndex.foreach { case (agg, index) =>
          if(null ==  accTypes(index)){
            accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, accTypes(index))
          }
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121335610
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala ---
    @@ -37,24 +36,26 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
       * @param name function name (used by SQL parser)
       * @param aggregateFunction aggregate function to be called
       * @param returnType the type information of returned value
    +  * @param accType the type information of the accumulator
       * @param typeFactory type factory for converting Flink's between Calcite's types
       */
     class AggSqlFunction(
         name: String,
         aggregateFunction: AggregateFunction[_, _],
    -    returnType: TypeInformation[_],
    +    val returnType: TypeInformation[_],
    +    val accType: TypeInformation[_],
         typeFactory: FlinkTypeFactory,
         requiresOver: Boolean)
    -  extends SqlUserDefinedAggFunction(
    +  extends SqlAggFunction(
    --- End diff --
    
    Sorry, I didn't notice the copied `SqlUserDefinedAggFunction` from Calcite. I did these change because the constructor of Calcite `SqlUserDefinedAggFunction` doesn't match the calling with additional `requiresOver`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122447580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -192,10 +193,14 @@ class BatchTableEnvironment(
           name: String,
           f: AggregateFunction[T, ACC])
       : Unit = {
    -    implicit val typeInfo: TypeInformation[T] = TypeExtractor
    -      .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0)
    +    implicit val typeInfo: TypeInformation[T] = UserDefinedFunctionUtils
    +      .getResultTypeOfAggregateFunction(f)
           .asInstanceOf[TypeInformation[T]]
     
    +    implicit val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionUtils
    --- End diff --
    
    The correct extraction of TypeInformation for Scala classes only works in the Scala TableEnvironments but not for Java environments. Hence, UDAGGs that use Scala classes should not be used in Java queries because they will fall back to GenericTypes there.
    
    The safe and portable way is to use Java classes for accumulators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122712259
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -192,10 +193,14 @@ class BatchTableEnvironment(
           name: String,
           f: AggregateFunction[T, ACC])
       : Unit = {
    -    implicit val typeInfo: TypeInformation[T] = TypeExtractor
    -      .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0)
    +    implicit val typeInfo: TypeInformation[T] = UserDefinedFunctionUtils
    +      .getResultTypeOfAggregateFunction(f)
           .asInstanceOf[TypeInformation[T]]
     
    +    implicit val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionUtils
    --- End diff --
    
    UDAGGs are sometimes also shared in libraries. If a UDAGG class is included in a JAR file and loaded into the classpath, a user can hardly tell whether it is implemented in Java or Scala. So, I would not prohibit Scala UDFs in Jave environments. However, functions which are intended to be shared should be implemented with Java types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121625889
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    +    extractedType: TypeInformation[_] = null)
    +  : TypeInformation[_] = {
    +
    +    val accType = try {
    +      val method: Method = aggregateFunction.getClass.getMethod("getAccumulatorType")
    +      method.invoke(aggregateFunction).asInstanceOf[TypeInformation[_]]
    +    } catch {
    +      case _: NoSuchMethodException => null
    +      case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    +    }
    +    if (accType != null) {
    +      accType
    +    } else if (extractedType != null) {
    +      extractedType
    +    } else {
    +      val accumulator = aggregateFunction.createAccumulator()
    +      try {
    +        TypeInformation.of(accumulator.getClass)
    --- End diff --
    
    I think it's a good advice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r126702994
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -320,12 +347,20 @@ object UserDefinedFunctionUtils {
         } else if(extractedType != null) {
    --- End diff --
    
    +space `} else if (extractedType`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124171844
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    Ping @fhueske , what do you think about this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4105: [FLINK-6888] [table] Can not determine TypeInformation of...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4105
  
    @wuchong thanks for the update. It's looks pretty good for me now.
    + to merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4105: [FLINK-6888] [table] Can not determine TypeInforma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r124570840
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---
    @@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
     import _root_.java.lang.{Boolean => JBool}
     
    +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
    --- End diff --
    
    unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---