You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/08/21 19:38:00 UTC

[jira] [Commented] (FLINK-10194) Serialization issue with Scala's AggregateDataSet[Row]

    [ https://issues.apache.org/jira/browse/FLINK-10194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587917#comment-16587917 ] 

Fabian Hueske commented on FLINK-10194:
---------------------------------------

The build-in aggregation functions of the DataSet API can only applied on Tuple or CaseClass types. {{Row}} is not supported. 

There are a few checks that try to validate the correct type, but these fail because {{RowTypeInfo}} extends {{TupleTypeInfoBase}}.
We should improve these checks.

> Serialization issue with Scala's AggregateDataSet[Row]
> ------------------------------------------------------
>
>                 Key: FLINK-10194
>                 URL: https://issues.apache.org/jira/browse/FLINK-10194
>             Project: Flink
>          Issue Type: Bug
>         Environment: Flink v1.6.0
>            Reporter: Alexis Sarda-Espinosa
>            Priority: Minor
>              Labels: scala, serialization
>
>  
> Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:
> {code:java}
> object Main {
>   def main(args: Array[String]): Unit = {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
>     val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>       BasicTypeInfo.INT_TYPE_INFO)
>     import scala.collection.JavaConverters._
>     val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
>       typeInfo.createSerializer(env.getConfig))
>     val source = new DataSource(env.getJavaEnv,
>       inputFormat,
>       typeInfo,
>       "hello.flink.Main$.main(Main.scala:20")
>     
>     val dataSet = new DataSet(source)
>     dataSet.print()
>     dataSet
>       .groupBy(0)
>       .sum(1)
>       .print()
>   }
> }{code}
> The call to dataSet.print() works as expected, but the final print() throws an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
> at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260){noformat}
> Changing the final print() to collect() throws the same exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)