You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nilesh <ni...@nileshc.com> on 2014/05/24 17:32:57 UTC

Kryo serialization for closures: a workaround

Suppose my mappers can be functions (def) that internally call other classes
and create objects and do different things inside. (Or they can even be
classes that extend (Foo) => Bar and do the processing in their apply method
- but let's ignore this case for now)

Spark supports only Java Serialization for closures and forces all the
classes inside to implement Serializable and coughs up errors when forced to
use Kryo for closures. But one cannot expect all 3rd party libraries to have
all classes extend Serializable!

Here's a workaround that I thought I'd share in case anyone comes across
this problem:

You simply need to serialize the objects before passing through the closure,
and de-serialize afterwards. This approach just works, even if your classes
aren't Serializable, because it uses Kryo behind the scenes. All you need is
some curry. ;) Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])              
(foo: Foo) : Bar = {    kryoWrapper.value.apply(foo)}val mapper =
genMapper(KryoSerializationWrapper(new Blah(abc)))
_rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =>
Bar) {    def apply(foo: Foo) : Bar = { //This is the real function }}
Feel free to make Blah as complicated as you want, class, companion object,
nested classes, references to multiple 3rd party libs.

KryoSerializationWrapper refers to  this wrapper from amplab/shark
<https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala>  

Don't you think it's a good idea to have something like this inside the
framework itself? :)



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Kryo serialization for closures: a workaround

Posted by Reynold Xin <rx...@databricks.com>.
Thanks for sending this in.

The ASF list doesn't support html so the formatting of the code is a little
messed up. For those who want to see the code in clearly formatted text, go
to
http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html



On Sat, May 24, 2014 at 8:32 AM, Nilesh <ni...@nileshc.com> wrote:

> Suppose my mappers can be functions (def) that internally call other
> classes
> and create objects and do different things inside. (Or they can even be
> classes that extend (Foo) => Bar and do the processing in their apply
> method
> - but let's ignore this case for now)
>
> Spark supports only Java Serialization for closures and forces all the
> classes inside to implement Serializable and coughs up errors when forced
> to
> use Kryo for closures. But one cannot expect all 3rd party libraries to
> have
> all classes extend Serializable!
>
> Here's a workaround that I thought I'd share in case anyone comes across
> this problem:
>
> You simply need to serialize the objects before passing through the
> closure,
> and de-serialize afterwards. This approach just works, even if your classes
> aren't Serializable, because it uses Kryo behind the scenes. All you need
> is
> some curry. ;) Here's an example of how I did it:
>
> def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
> (foo: Foo) : Bar = {    kryoWrapper.value.apply(foo)}val mapper =
> genMapper(KryoSerializationWrapper(new Blah(abc)))
> _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =>
> Bar) {    def apply(foo: Foo) : Bar = { //This is the real function }}
> Feel free to make Blah as complicated as you want, class, companion object,
> nested classes, references to multiple 3rd party libs.
>
> KryoSerializationWrapper refers to  this wrapper from amplab/shark
> <
> https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
> >
>
> Don't you think it's a good idea to have something like this inside the
> framework itself? :)
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.

Re: Kryo serialization for closures: a workaround

Posted by Will Benton <wi...@redhat.com>.
This is an interesting approach, Nilesh!

Someone will correct me if I'm wrong, but I don't think this could go into ClosureCleaner as a default behavior (since Kryo apparently breaks on some classes that depend on custom Java serializers, as has come up on the list recently).  But it does seem like having a function in Spark that did this for closures more transparently (to be called explicitly by clients in problem cases) could be pretty useful.


best,
wb


----- Original Message -----
> From: "Nilesh" <ni...@nileshc.com>
> To: dev@spark.incubator.apache.org
> Sent: Saturday, May 24, 2014 10:32:57 AM
> Subject: Kryo serialization for closures: a workaround
> 
> Suppose my mappers can be functions (def) that internally call other classes
> and create objects and do different things inside. (Or they can even be
> classes that extend (Foo) => Bar and do the processing in their apply method
> - but let's ignore this case for now)
> 
> Spark supports only Java Serialization for closures and forces all the
> classes inside to implement Serializable and coughs up errors when forced to
> use Kryo for closures. But one cannot expect all 3rd party libraries to have
> all classes extend Serializable!
> 
> Here's a workaround that I thought I'd share in case anyone comes across
> this problem:
> 
> You simply need to serialize the objects before passing through the closure,
> and de-serialize afterwards. This approach just works, even if your classes
> aren't Serializable, because it uses Kryo behind the scenes. All you need is
> some curry. ;) Here's an example of how I did it:
> 
> def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
> (foo: Foo) : Bar = {    kryoWrapper.value.apply(foo)}val mapper =
> genMapper(KryoSerializationWrapper(new Blah(abc)))
> _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =>
> Bar) {    def apply(foo: Foo) : Bar = { //This is the real function }}
> Feel free to make Blah as complicated as you want, class, companion object,
> nested classes, references to multiple 3rd party libs.
> 
> KryoSerializationWrapper refers to  this wrapper from amplab/shark
> <https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala>
> 
> Don't you think it's a good idea to have something like this inside the
> framework itself? :)
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.