You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (JIRA)" <ji...@apache.org> on 2017/06/11 11:54:18 UTC

[jira] [Updated] (FLINK-6888) Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala case/tuple class

     [ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu updated FLINK-6888:
---------------------------
    Description: 
Currently the {{ACC}} TypeInformation of {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or tuple class, the TypeInformation will fall back to {{GenericType}} which result in bad performance when state de/serialization. 

I suggest to extract the ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.

Here is an example:

{code}
case class Accumulator(sum: Long, count: Long)

class MyAgg extends AggregateFunction[Long, Accumulator] {

  //Overloaded accumulate method
  def accumulate(acc: Accumulator, value: Long): Unit = {
  }

  override def createAccumulator(): Accumulator = Accumulator(0, 0)

  override def getValue(accumulator: Accumulator): Long = 1
}
{code}

The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.

  was:
Currently the {{ACC}} TypeInformation of {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or tuple class, the TypeInformation will fall back to {{GenericType}} which result in bad performance when state de/serialization. 

I suggest to extract the ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or tuple class, the TypeInformation will fall back to {{GenericType}} which result in bad performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)