You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Fengyun RAO <ra...@gmail.com> on 2014/07/31 15:47:51 UTC

How to share a NonSerializable variable among tasks in the same worker node?

As shown here:
2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
<http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/>


123456789101112131415

object JSONParser {  def parse(raw: String): String = ...}object
MyFirstSparkJob {  def main(args: Array[String]) {    val sc = new
SparkContext()    val lines = sc.textFileStream("beacons.txt")
lines.map(line => JSONParser.parse(line))    lines.foreach(line =>
println(line))    ssc.start()  }}

It says " parser instance is now a singleton created in the scope of our
driver program" which I thought was in the scope of executor. Am I wrong,
or why?

What if the parser is not serializable, and I want to share it among tasks
in the same worker node?

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by DB Tsai <db...@dbtsai.com>.
You can try to define a wrapper class for your parser, and create an
instance of your parser in companion object as a singleton object.
Thus, even you create an object of wrapper in mapPartition every time,
each JVM will have only a single instance of your parser object.

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Mon, Aug 4, 2014 at 2:01 AM, Fengyun RAO <ra...@gmail.com> wrote:
> Thanks, Sean!
>
> It works, but as the link in 2 - Why Is My Spark Job so Slow and Only Using
> a Single Thread? says " parser instance is now a singleton created in the
> scope of our driver program" which I thought was in the scope of executor.
> Am I wrong, or why?
>
> I didn't want the equivalent of "setup()" method, since I want to share the
> "parser" among tasks in the same worker node. It takes tens of seconds to
> initialize a "parser". What's more, I want to know if the "parser" could
> have a field such as ConcurrentHashMap which all tasks in the node may get()
> of put() items.
>
>
>
>
> 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>:
>
>> The parser does not need to be serializable. In the line:
>>
>> lines.map(line => JSONParser.parse(line))
>>
>> ... the parser is called but there is no parser object that with state
>> that can be serialized. Are you sure it does not work?
>>
>> The error message alluded to originally refers to an object not shown
>> in the code, so I'm not 100% sure this was the original issue.
>>
>> If you want, the equivalent of "setup()" is really "writing some code
>> at the start of a call to mapPartitions()"
>>
>> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <ra...@gmail.com> wrote:
>> > Thanks, Ron.
>> >
>> > The problem is that the "parser" is written in another package which is
>> > not
>> > serializable.
>> >
>> > In mapreduce, I could create the "parser" in the map setup() method.
>> >
>> > Now in spark, I want to create it for each worker, and share it among
>> > all
>> > the tasks on the same work node.
>> >
>> > I know different workers run on different machine, but it doesn't have
>> > to
>> > communicate between workers.
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Fengyun RAO <ra...@gmail.com>.
Thanks, Sean!

It works, but as the link in 2 - Why Is My Spark Job so Slow and Only Using
a Single Thread?
<http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/>
 says " parser instance is now a singleton created in the scope of our
driver program" which I thought was in the scope of executor. Am I wrong,
or why?

I didn't want the equivalent of "setup()" method, since I want to share the
"parser" among tasks in the same worker node. It takes tens of seconds to
initialize a "parser". What's more, I want to know if the "parser" could
have a field such as ConcurrentHashMap which all tasks in the node may
get() of put() items.




2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>:

> The parser does not need to be serializable. In the line:
>
> lines.map(line => JSONParser.parse(line))
>
> ... the parser is called but there is no parser object that with state
> that can be serialized. Are you sure it does not work?
>
> The error message alluded to originally refers to an object not shown
> in the code, so I'm not 100% sure this was the original issue.
>
> If you want, the equivalent of "setup()" is really "writing some code
> at the start of a call to mapPartitions()"
>
> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <ra...@gmail.com> wrote:
> > Thanks, Ron.
> >
> > The problem is that the "parser" is written in another package which is
> not
> > serializable.
> >
> > In mapreduce, I could create the "parser" in the map setup() method.
> >
> > Now in spark, I want to create it for each worker, and share it among all
> > the tasks on the same work node.
> >
> > I know different workers run on different machine, but it doesn't have to
> > communicate between workers.
>

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Sean Owen <so...@cloudera.com>.
The parser does not need to be serializable. In the line:

lines.map(line => JSONParser.parse(line))

... the parser is called but there is no parser object that with state
that can be serialized. Are you sure it does not work?

The error message alluded to originally refers to an object not shown
in the code, so I'm not 100% sure this was the original issue.

If you want, the equivalent of "setup()" is really "writing some code
at the start of a call to mapPartitions()"

On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <ra...@gmail.com> wrote:
> Thanks, Ron.
>
> The problem is that the "parser" is written in another package which is not
> serializable.
>
> In mapreduce, I could create the "parser" in the map setup() method.
>
> Now in spark, I want to create it for each worker, and share it among all
> the tasks on the same work node.
>
> I know different workers run on different machine, but it doesn't have to
> communicate between workers.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Fengyun RAO <ra...@gmail.com>.
Thanks, Ron.

The problem is that the "parser" is written in another package which is not
serializable.

In mapreduce, I could create the "parser" in the map setup() method.

Now in spark, I want to create it for each worker, and share it among all
the tasks on the same work node.

I know different workers run on different machine, but it doesn't have to
communicate between workers.



2014-08-04 10:51 GMT+08:00 Ron's Yahoo! <zl...@yahoo.com>:

> I think you’re going to have to make it serializable by registering it
> with the Kryo registrator. I think multiple workers are running as separate
> VMs so it might need to be able to serialize and deserialize broadcasted
> variables to the different executors.
>
> Thanks,
> Ron
>
> On Aug 3, 2014, at 6:38 PM, Fengyun RAO <ra...@gmail.com> wrote:
>
> Could anybody help?
>
> I wonder if I asked a stupid question or I didn't make the question clear?
>
>
> 2014-07-31 21:47 GMT+08:00 Fengyun RAO <ra...@gmail.com>:
>
>> As shown here:
>> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
>> <http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/>
>>
>>
>>  123456789101112131415
>>
>> object JSONParser {  def parse(raw: String): String = ...}object MyFirstSparkJob {  def main(args: Array[String]) {    val sc = new SparkContext()    val lines = sc.textFileStream("beacons.txt")    lines.map(line => JSONParser.parse(line))    lines.foreach(line => println(line))    ssc.start()  }}
>>
>> It says " parser instance is now a singleton created in the scope of our
>> driver program" which I thought was in the scope of executor. Am I
>> wrong, or why?
>>
>> What if the parser is not serializable, and I want to share it among
>> tasks in the same worker node?
>>
>>
>>
>>
>>
>
>

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Ron's Yahoo! <zl...@yahoo.com.INVALID>.
I think you’re going to have to make it serializable by registering it with the Kryo registrator. I think multiple workers are running as separate VMs so it might need to be able to serialize and deserialize broadcasted variables to the different executors.

Thanks,
Ron

On Aug 3, 2014, at 6:38 PM, Fengyun RAO <ra...@gmail.com> wrote:

> Could anybody help?
> 
> I wonder if I asked a stupid question or I didn't make the question clear?
> 
> 
> 2014-07-31 21:47 GMT+08:00 Fengyun RAO <ra...@gmail.com>:
> As shown here:
> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
> 
> 
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> 12
> 13
> 14
> 15
> object JSONParser {
>   def parse(raw: String): String = ...
> }
> 
> object MyFirstSparkJob {
>   def main(args: Array[String]) {
>     val sc = new SparkContext()
> 
>     val lines = sc.textFileStream("beacons.txt")
>     lines.map(line => JSONParser.parse(line))
>     lines.foreach(line => println(line))
> 
>     ssc.start()
>   }
> }
> It says " parser instance is now a singleton created in the scope of our driver program" which I thought was in the scope of executor. Am I wrong, or why?
> 
> What if the parser is not serializable, and I want to share it among tasks in the same worker node?
> 
> 
> 
> 
> 
> 


Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Fengyun RAO <ra...@gmail.com>.
Could anybody help?

I wonder if I asked a stupid question or I didn't make the question clear?


2014-07-31 21:47 GMT+08:00 Fengyun RAO <ra...@gmail.com>:

> As shown here:
> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
> <http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/>
>
>
>  123456789101112131415
>
> object JSONParser {  def parse(raw: String): String = ...}object MyFirstSparkJob {  def main(args: Array[String]) {    val sc = new SparkContext()    val lines = sc.textFileStream("beacons.txt")    lines.map(line => JSONParser.parse(line))    lines.foreach(line => println(line))    ssc.start()  }}
>
> It says " parser instance is now a singleton created in the scope of our
> driver program" which I thought was in the scope of executor. Am I wrong,
> or why?
>
> What if the parser is not serializable, and I want to share it among
> tasks in the same worker node?
>
>
>
>
>

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Sean Owen <so...@cloudera.com>.
Singletons aren't hacks; it can be an entirely appropriate pattern for
this. What exception do you get? From Spark or your code? I think this
pattern is orthogonal to using Spark.
On Jan 21, 2015 8:11 AM, "octavian.ganea" <oc...@inf.ethz.ch>
wrote:

> In case someone has the same problem:
>
> The singleton hack works for me sometimes, sometimes it doesn't in spark
> 1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
> need to work with big indexes and you want to have the smallest amount of
> communication between master and nodes, as well as if you have RAM
> available
> just for one instance of the indexes data per machine, than I suggest you
> use spark with memcached .
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by "octavian.ganea" <oc...@inf.ethz.ch>.
In case someone has the same problem:

The singleton hack works for me sometimes, sometimes it doesn't in spark
1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
need to work with big indexes and you want to have the smallest amount of
communication between master and nodes, as well as if you have RAM available
just for one instance of the indexes data per machine, than I suggest you
use spark with memcached . 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by Fengyun RAO <ra...@gmail.com>.
currently we migrate from 1.1 to 1.2, and found our program 3x slower,
maybe due to the singleton hack?

could you explain in detail why or how "The singleton hack works very
different in spark 1.2.0 "

thanks!

2015-01-18 20:56 GMT+08:00 octavian.ganea <oc...@inf.ethz.ch>:

> The singleton hack works very different in spark 1.2.0 (it does not work if
> the program has multiple map-reduce jobs in the same program). I guess
> there
> should be an official documentation on how to have each machine/node do an
> init step locally before executing any other instructions (e.g. loading
> locally a very big object once at the begining that can be used in all
> further map jobs that will be assigned to that worker).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: How to share a NonSerializable variable among tasks in the same worker node?

Posted by "octavian.ganea" <oc...@inf.ethz.ch>.
The singleton hack works very different in spark 1.2.0 (it does not work if
the program has multiple map-reduce jobs in the same program). I guess there
should be an official documentation on how to have each machine/node do an
init step locally before executing any other instructions (e.g. loading
locally a very big object once at the begining that can be used in all
further map jobs that will be assigned to that worker).



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org