You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrianasolo Fanilo <fa...@worldline.com> on 2014/09/04 15:29:00 UTC

Object serialisation inside closures

Hello Spark fellows :)

I'm a new user of Spark and Scala and have been using both for 6 months without too many problems.
Here I'm looking for best practices for using non-serializable classes inside closure. I'm using Spark-0.9.0-incubating here with Hadoop 2.2.

Suppose I am using OpenCSV parser to parse an input file. So inside my main :

val sc = new SparkContext("local[2]", "App")
val heyRDD = sc.textFile("...")

val csvparser = new CSVParser(';')
val heyMap = heyRDD.map { line =>
      val temp = csvparser.parseLine(line)
      (temp(1), temp(4))
}


This gives me a java.io.NotSerializableException: au.com.bytecode.opencsv.CSVParser, which seems reasonable.

>From here I could see 3 solutions :
1/ Extending CSVParser with Serialisable properties, which adds a lot of boilerplate code if you ask me
2/ Using Kryo Serialization (still need to define a serializer)
3/ Creating an object with an instance of the class I want to use, typically :

object CSVParserPlus {

  val csvParser = new CSVParser(';')

  def parse(line: String) = {
    csvParser.parseLine(line)
  }
}


    val heyMap = heyRDD.map { line =>
      val temp = CSVParserPlus.parse(line)
      (temp(1), temp(4))
    }

Third solution works and I don't get how, so I was wondering how worked the closure system inside Spark to be able to serialize an object with a non-serializable instance. How does that work ? Does it hinder performance ? Is it a good solution ? How do you manage this problem ?

Any input would be greatly appreciated

Best regards,
Fanilo

________________________________

Ce message et les pi?ces jointes sont confidentiels et r?serv?s ? l'usage exclusif de ses destinataires. Il peut ?galement ?tre prot?g? par le secret professionnel. Si vous recevez ce message par erreur, merci d'en avertir imm?diatement l'exp?diteur et de le d?truire. L'int?grit? du message ne pouvant ?tre assur?e sur Internet, la responsabilit? de Worldline ne pourra ?tre recherch?e quant au contenu de ce message. Bien que les meilleurs efforts soient faits pour maintenir cette transmission exempte de tout virus, l'exp?diteur ne donne aucune garantie ? cet ?gard et sa responsabilit? ne saurait ?tre recherch?e pour tout dommage r?sultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, the Worldline liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted.

Re: Object serialisation inside closures

Posted by Mohit Jaggi <mo...@gmail.com>.
I faced the same problem and ended up using the same approach that Sean
suggested
https://github.com/AyasdiOpenSource/df/blob/master/src/main/scala/com/ayasdi/df/DF.scala#L313

Option 3 also seems reasonable. It should create a CSVParser per executor.


On Thu, Sep 4, 2014 at 6:58 AM, Andrianasolo Fanilo <
fanilo.andrianasolo@worldline.com> wrote:

> Thank you for the quick answer, looks good to me
>
> Though that brings me to another question. Suppose we want to open a
> connection to a database, an ElasticSearch, etc...
>
> I now have two proceedings :
> 1/ use .mapPartitions and setup the connection at the start of each
> partition, so I get a connection per partition
> 2/ use a singleton object, which loads a connection per executor if my
> understanding is correct
>
> I would have used the second possibility, so I don't create a new
> connection for a partition each time the partition fails to compute for
> whatever reason.  I also don't have a lot of connections in parallel
> because I have only one connection per worker. If I have 200 partitions in
> parallel, that makes 200 connections.
> But in the second case a partition could kill the connection on the worker
> during computation and because that connection is shared for all tasks of
> the executor, all partitions would fail. Also, only one connection object
> would have to manage 200 partitions trying to output to
> Elasticsearch/database/etc...that may be bad performance-wise.
>
> Can't see a case where second is preferable for now. Doesn't seem I could
> use that singleton object to share data within an executor sadly...
>
> Thanks for the input
> Fanilo
>
>
> -----Message d'origine-----
> De : Sean Owen [mailto:sowen@cloudera.com]
> Envoyé : jeudi 4 septembre 2014 15:36
> À : Andrianasolo Fanilo
> Cc : user@spark.apache.org
> Objet : Re: Object serialisation inside closures
>
> In your original version, the object is referenced by the function but
> it's on the driver, and so has to be serialized. This leads to an error
> since it's not serializable. Instead, you want to recreate the object
> locally on each of the remote machines.
>
> In your third version you are holding the parser in a static member of a
> class, in your Scala object. When you call the parse method, you're calling
> it on the instance of the CSVParserPlus class that was loaded on the remote
> worker. It loads and creates its own copy of the parser.
>
> A maybe more compact solution is to use mapPartitions, and create the
> parser once at the start. This avoids needing this static / singleton
> pattern, but also means the parser is created only once per partition.
>
> On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo <
> fanilo.andrianasolo@worldline.com> wrote:
> > Hello Spark fellows J
> >
> >
> >
> > I’m a new user of Spark and Scala and have been using both for 6
> > months without too many problems.
> >
> > Here I’m looking for best practices for using non-serializable classes
> > inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
> >
> >
> >
> > Suppose I am using OpenCSV parser to parse an input file. So inside my
> > main
> > :
> >
> >
> >
> > val sc = new SparkContext("local[2]", "App")
> >
> > val heyRDD = sc.textFile("…")
> >
> >
> >
> > val csvparser = new CSVParser(';')
> >
> > val heyMap = heyRDD.map { line =>
> >
> >       val temp = csvparser.parseLine(line)
> >
> >       (temp(1), temp(4))
> >
> > }
> >
> >
> >
> >
> >
> > This gives me a java.io.NotSerializableException:
> > au.com.bytecode.opencsv.CSVParser, which seems reasonable.
> >
> >
> >
> > From here I could see 3 solutions :
> >
> > 1/ Extending CSVParser with Serialisable properties, which adds a lot
> > of boilerplate code if you ask me
> >
> > 2/ Using Kryo Serialization (still need to define a serializer)
> >
> > 3/ Creating an object with an instance of the class I want to use,
> > typically
> > :
> >
> >
> >
> > object CSVParserPlus {
> >
> >
> >
> >   val csvParser = new CSVParser(';')
> >
> >
> >
> >   def parse(line: String) = {
> >
> >     csvParser.parseLine(line)
> >
> >   }
> >
> > }
> >
> >
> >
> >
> >
> >     val heyMap = heyRDD.map { line =>
> >
> >       val temp = CSVParserPlus.parse(line)
> >
> >       (temp(1), temp(4))
> >
> >     }
> >
> >
> >
> > Third solution works and I don’t get how, so I was wondering how
> > worked the closure system inside Spark to be able to serialize an
> > object with a non-serializable instance. How does that work ? Does it
> hinder performance ?
> > Is it a good solution ? How do you manage this problem ?
> >
> >
> >
> > Any input would be greatly appreciated
> >
> >
> >
> > Best regards,
> >
> > Fanilo
> >
> >
> > ________________________________
> >
> > Ce message et les pièces jointes sont confidentiels et réservés à
> > l'usage exclusif de ses destinataires. Il peut également être protégé
> > par le secret professionnel. Si vous recevez ce message par erreur,
> > merci d'en avertir immédiatement l'expéditeur et de le détruire.
> > L'intégrité du message ne pouvant être assurée sur Internet, la
> > responsabilité de Worldline ne pourra être recherchée quant au contenu
> > de ce message. Bien que les meilleurs efforts soient faits pour
> > maintenir cette transmission exempte de tout virus, l'expéditeur ne
> > donne aucune garantie à cet égard et sa responsabilité ne saurait être
> > recherchée pour tout dommage résultant d'un virus transmis.
> >
> > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive
> > this e-mail in error, please notify the sender immediately and destroy
> > it. As its integrity cannot be secured on the Internet, the Worldline
> > liability cannot be triggered for the message content. Although the
> > sender endeavours to maintain a computer virus-free network, the
> > sender does not warrant that this transmission is virus-free and will
> > not be liable for any damages resulting from any virus transmitted.
>
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>

RE: Object serialisation inside closures

Posted by Andrianasolo Fanilo <fa...@worldline.com>.
Thank you for the quick answer, looks good to me

Though that brings me to another question. Suppose we want to open a connection to a database, an ElasticSearch, etc...

I now have two proceedings :
1/ use .mapPartitions and setup the connection at the start of each partition, so I get a connection per partition
2/ use a singleton object, which loads a connection per executor if my understanding is correct

I would have used the second possibility, so I don't create a new connection for a partition each time the partition fails to compute for whatever reason.  I also don't have a lot of connections in parallel because I have only one connection per worker. If I have 200 partitions in parallel, that makes 200 connections.
But in the second case a partition could kill the connection on the worker during computation and because that connection is shared for all tasks of the executor, all partitions would fail. Also, only one connection object would have to manage 200 partitions trying to output to Elasticsearch/database/etc...that may be bad performance-wise.

Can't see a case where second is preferable for now. Doesn't seem I could use that singleton object to share data within an executor sadly...

Thanks for the input
Fanilo


-----Message d'origine-----
De : Sean Owen [mailto:sowen@cloudera.com]
Envoyé : jeudi 4 septembre 2014 15:36
À : Andrianasolo Fanilo
Cc : user@spark.apache.org
Objet : Re: Object serialisation inside closures

In your original version, the object is referenced by the function but it's on the driver, and so has to be serialized. This leads to an error since it's not serializable. Instead, you want to recreate the object locally on each of the remote machines.

In your third version you are holding the parser in a static member of a class, in your Scala object. When you call the parse method, you're calling it on the instance of the CSVParserPlus class that was loaded on the remote worker. It loads and creates its own copy of the parser.

A maybe more compact solution is to use mapPartitions, and create the parser once at the start. This avoids needing this static / singleton pattern, but also means the parser is created only once per partition.

On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo <fa...@worldline.com> wrote:
> Hello Spark fellows J
>
>
>
> I’m a new user of Spark and Scala and have been using both for 6
> months without too many problems.
>
> Here I’m looking for best practices for using non-serializable classes
> inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
>
>
>
> Suppose I am using OpenCSV parser to parse an input file. So inside my
> main
> :
>
>
>
> val sc = new SparkContext("local[2]", "App")
>
> val heyRDD = sc.textFile("…")
>
>
>
> val csvparser = new CSVParser(';')
>
> val heyMap = heyRDD.map { line =>
>
>       val temp = csvparser.parseLine(line)
>
>       (temp(1), temp(4))
>
> }
>
>
>
>
>
> This gives me a java.io.NotSerializableException:
> au.com.bytecode.opencsv.CSVParser, which seems reasonable.
>
>
>
> From here I could see 3 solutions :
>
> 1/ Extending CSVParser with Serialisable properties, which adds a lot
> of boilerplate code if you ask me
>
> 2/ Using Kryo Serialization (still need to define a serializer)
>
> 3/ Creating an object with an instance of the class I want to use,
> typically
> :
>
>
>
> object CSVParserPlus {
>
>
>
>   val csvParser = new CSVParser(';')
>
>
>
>   def parse(line: String) = {
>
>     csvParser.parseLine(line)
>
>   }
>
> }
>
>
>
>
>
>     val heyMap = heyRDD.map { line =>
>
>       val temp = CSVParserPlus.parse(line)
>
>       (temp(1), temp(4))
>
>     }
>
>
>
> Third solution works and I don’t get how, so I was wondering how
> worked the closure system inside Spark to be able to serialize an
> object with a non-serializable instance. How does that work ? Does it hinder performance ?
> Is it a good solution ? How do you manage this problem ?
>
>
>
> Any input would be greatly appreciated
>
>
>
> Best regards,
>
> Fanilo
>
>
> ________________________________
>
> Ce message et les pièces jointes sont confidentiels et réservés à
> l'usage exclusif de ses destinataires. Il peut également être protégé
> par le secret professionnel. Si vous recevez ce message par erreur,
> merci d'en avertir immédiatement l'expéditeur et de le détruire.
> L'intégrité du message ne pouvant être assurée sur Internet, la
> responsabilité de Worldline ne pourra être recherchée quant au contenu
> de ce message. Bien que les meilleurs efforts soient faits pour
> maintenir cette transmission exempte de tout virus, l'expéditeur ne
> donne aucune garantie à cet égard et sa responsabilité ne saurait être
> recherchée pour tout dommage résultant d'un virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive
> this e-mail in error, please notify the sender immediately and destroy
> it. As its integrity cannot be secured on the Internet, the Worldline
> liability cannot be triggered for the message content. Although the
> sender endeavours to maintain a computer virus-free network, the
> sender does not warrant that this transmission is virus-free and will
> not be liable for any damages resulting from any virus transmitted.


Ce message et les pièces jointes sont confidentiels et réservés à l'usage exclusif de ses destinataires. Il peut également être protégé par le secret professionnel. Si vous recevez ce message par erreur, merci d'en avertir immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra être recherchée quant au contenu de ce message. Bien que les meilleurs efforts soient faits pour maintenir cette transmission exempte de tout virus, l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, the Worldline liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted.

Re: Object serialisation inside closures

Posted by Sean Owen <so...@cloudera.com>.
In your original version, the object is referenced by the function but
it's on the driver, and so has to be serialized. This leads to an
error since it's not serializable. Instead, you want to recreate the
object locally on each of the remote machines.

In your third version you are holding the parser in a static member of
a class, in your Scala object. When you call the parse method, you're
calling it on the instance of the CSVParserPlus class that was loaded
on the remote worker. It loads and creates its own copy of the parser.

A maybe more compact solution is to use mapPartitions, and create the
parser once at the start. This avoids needing this static / singleton
pattern, but also means the parser is created only once per partition.

On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo
<fa...@worldline.com> wrote:
> Hello Spark fellows J
>
>
>
> I’m a new user of Spark and Scala and have been using both for 6 months
> without too many problems.
>
> Here I’m looking for best practices for using non-serializable classes
> inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
>
>
>
> Suppose I am using OpenCSV parser to parse an input file. So inside my main
> :
>
>
>
> val sc = new SparkContext("local[2]", "App")
>
> val heyRDD = sc.textFile("…")
>
>
>
> val csvparser = new CSVParser(';')
>
> val heyMap = heyRDD.map { line =>
>
>       val temp = csvparser.parseLine(line)
>
>       (temp(1), temp(4))
>
> }
>
>
>
>
>
> This gives me a java.io.NotSerializableException:
> au.com.bytecode.opencsv.CSVParser, which seems reasonable.
>
>
>
> From here I could see 3 solutions :
>
> 1/ Extending CSVParser with Serialisable properties, which adds a lot of
> boilerplate code if you ask me
>
> 2/ Using Kryo Serialization (still need to define a serializer)
>
> 3/ Creating an object with an instance of the class I want to use, typically
> :
>
>
>
> object CSVParserPlus {
>
>
>
>   val csvParser = new CSVParser(';')
>
>
>
>   def parse(line: String) = {
>
>     csvParser.parseLine(line)
>
>   }
>
> }
>
>
>
>
>
>     val heyMap = heyRDD.map { line =>
>
>       val temp = CSVParserPlus.parse(line)
>
>       (temp(1), temp(4))
>
>     }
>
>
>
> Third solution works and I don’t get how, so I was wondering how worked the
> closure system inside Spark to be able to serialize an object with a
> non-serializable instance. How does that work ? Does it hinder performance ?
> Is it a good solution ? How do you manage this problem ?
>
>
>
> Any input would be greatly appreciated
>
>
>
> Best regards,
>
> Fanilo
>
>
> ________________________________
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended solely
> for the addressee; it may also be privileged. If you receive this e-mail in
> error, please notify the sender immediately and destroy it. As its integrity
> cannot be secured on the Internet, the Worldline liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Object serialisation inside closures

Posted by Yana Kadiyska <ya...@gmail.com>.
In the third case the object does not get shipped around. Each executor
will create it's own instance. I got bitten by this here:

http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-object-access-from-mapper-simple-question-tt8125.html


On Thu, Sep 4, 2014 at 9:29 AM, Andrianasolo Fanilo <
fanilo.andrianasolo@worldline.com> wrote:

>  Hello Spark fellows J
>
>
>
> I’m a new user of Spark and Scala and have been using both for 6 months
> without too many problems.
>
> Here I’m looking for best practices for using non-serializable classes
> inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
>
>
>
> Suppose I am using OpenCSV parser to parse an input file. So inside my
> main :
>
>
>
> val sc = new SparkContext("local[2]", "App")
>
> val heyRDD = sc.textFile("…")
>
>
>
> val csvparser = new CSVParser(';')
>
> val heyMap = heyRDD.map { line =>
>
>       val temp = csvparser.parseLine(line)
>
>       (temp(1), temp(4))
>
> }
>
>
>
>
>
> This gives me a java.io.NotSerializableException:
> au.com.bytecode.opencsv.CSVParser, which seems reasonable.
>
>
>
> From here I could see 3 solutions :
>
> 1/ Extending CSVParser with Serialisable properties, which adds a lot of
> boilerplate code if you ask me
>
> 2/ Using Kryo Serialization (still need to define a serializer)
>
> 3/ Creating an object with an instance of the class I want to use,
> typically :
>
>
>
> object CSVParserPlus {
>
>
>
>   val csvParser = new CSVParser(';')
>
>
>
>   def parse(line: String) = {
>
>     csvParser.parseLine(line)
>
>   }
>
> }
>
>
>
>
>
>     val heyMap = heyRDD.map { line =>
>
>       val temp = CSVParserPlus.parse(line)
>
>       (temp(1), temp(4))
>
>     }
>
>
>
> Third solution works and I don’t get how, so I was wondering how worked
> the closure system inside Spark to be able to serialize an object with a
> non-serializable instance. How does that work ? Does it hinder performance
> ? Is it a good solution ? How do you manage this problem ?
>
>
>
> Any input would be greatly appreciated
>
>
>
> Best regards,
>
> Fanilo
>
> ------------------------------
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>