You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/08/21 19:38:00 UTC
[jira] [Commented] (FLINK-10194) Serialization issue with Scala's
AggregateDataSet[Row]
[ https://issues.apache.org/jira/browse/FLINK-10194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587917#comment-16587917 ]
Fabian Hueske commented on FLINK-10194:
---------------------------------------
The build-in aggregation functions of the DataSet API can only applied on Tuple or CaseClass types. {{Row}} is not supported.
There are a few checks that try to validate the correct type, but these fail because {{RowTypeInfo}} extends {{TupleTypeInfoBase}}.
We should improve these checks.
> Serialization issue with Scala's AggregateDataSet[Row]
> ------------------------------------------------------
>
> Key: FLINK-10194
> URL: https://issues.apache.org/jira/browse/FLINK-10194
> Project: Flink
> Issue Type: Bug
> Environment: Flink v1.6.0
> Reporter: Alexis Sarda-Espinosa
> Priority: Minor
> Labels: scala, serialization
>
>
> Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:
> {code:java}
> object Main {
> def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
> val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
> import scala.collection.JavaConverters._
> val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
> typeInfo.createSerializer(env.getConfig))
> val source = new DataSource(env.getJavaEnv,
> inputFormat,
> typeInfo,
> "hello.flink.Main$.main(Main.scala:20")
>
> val dataSet = new DataSet(source)
> dataSet.print()
> dataSet
> .groupBy(0)
> .sum(1)
> .print()
> }
> }{code}
> The call to dataSet.print() works as expected, but the final print() throws an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
> at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260){noformat}
> Changing the final print() to collect() throws the same exception.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)