You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/03/03 18:13:00 UTC

[jira] [Resolved] (SPARK-26980) Kryo deserialization not working with KryoSerializable class

     [ https://issues.apache.org/jira/browse/SPARK-26980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-26980.
-------------------------------
    Resolution: Not A Problem

Yes, my guess is it's because you're using Spark's Kryo and config, and it has class registration enabled. It tracks the class of a serialized object with a number, IIRC. If you're using Kryo directly within Spark, you'd have to take account of Spark's Kryo environment. 

Can you just let Spark serialize these things with Kryo? or let Kryo deal with its encoding? it is sounding like you're using class registration but then telling it the class of the thing you are deserializing.

I'm going to close this. To reopen, I'd say at least you need to demonstrate that your code works correctly outside Spark with Kryo class registration

> Kryo deserialization not working with KryoSerializable class
> ------------------------------------------------------------
>
>                 Key: SPARK-26980
>                 URL: https://issues.apache.org/jira/browse/SPARK-26980
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>         Environment: Local Spark v2.4.0
> Kotlin v1.3.21
>            Reporter: Alexis Sarda-Espinosa
>            Priority: Minor
>              Labels: kryo, serialization
>
> I'm trying to create an {{Aggregator}} that uses a custom container that should be serialized with {{Kryo:}} 
> {code:java}
> class StringSet(other: Collection<String>) : HashSet<String>(other), KryoSerializable {
>     companion object {
>         @JvmStatic
>         private val serialVersionUID = 1L
>     }
>     constructor() : this(Collections.emptyList())
>     override fun write(kryo: Kryo, output: Output) {
>         output.writeInt(this.size)
>         for (string in this) {
>             output.writeString(string)
>         }
>     }
>     override fun read(kryo: Kryo, input: Input) {
>         val size = input.readInt()
>         repeat(size) { this.add(input.readString()) }
>     }
> }
> {code}
> However, if I look at the corresponding value in the {{Row}} after aggregation (for example by using {{collectAsList()}}), I see a {{byte[]}}. Interestingly, the first byte in that array seems to be some sort of noise, and I can deserialize by doing something like this: 
> {code:java}
> val b = row.getAs<ByteArray>(2)
> val input = Input(b.copyOfRange(1, b.size)) // extra byte?
> val set = Kryo().readObject(input, StringSet::class.java)
> {code}
> Used configuration: 
> {code:java}
> SparkConf()
>     .setAppName("Hello Spark with Kotlin")
>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     .set("spark.kryo.registrationRequired", "true")
>     .registerKryoClasses(arrayOf(StringSet::class.java))
> {code}
> [Sample repo with all the code|https://github.com/asardaes/hello-spark-kotlin/tree/8e8a54fd81f0412507318149841c69bb17d8572c].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org