You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/02/28 15:38:00 UTC
[jira] [Updated] (FLINK-10194) Serialization issue with Scala's
AggregateDataSet[Row]
[ https://issues.apache.org/jira/browse/FLINK-10194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-10194:
-----------------------------------
Component/s: API / Type Serialization System
> Serialization issue with Scala's AggregateDataSet[Row]
> ------------------------------------------------------
>
> Key: FLINK-10194
> URL: https://issues.apache.org/jira/browse/FLINK-10194
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Environment: Flink v1.6.0
> Reporter: Alexis Sarda-Espinosa
> Priority: Minor
> Labels: scala, serialization
>
>
> Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:
> {code:java}
> object Main {
> def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
> val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
> import scala.collection.JavaConverters._
> val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
> typeInfo.createSerializer(env.getConfig))
> val source = new DataSource(env.getJavaEnv,
> inputFormat,
> typeInfo,
> "hello.flink.Main$.main(Main.scala:20")
>
> val dataSet = new DataSet(source)
> dataSet.print()
> dataSet
> .groupBy(0)
> .sum(1)
> .print()
> }
> }{code}
> The call to dataSet.print() works as expected, but the final print() throws an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
> at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260){noformat}
> Changing the final print() to collect() throws the same exception.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)