You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by dgoldenberg <dg...@gmail.com> on 2015/07/07 17:04:33 UTC

Best practice for using singletons on workers (seems unanswered) ?

Hi,

I am seeing a lot of posts on singletons vs. broadcast variables, such as
*
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
*
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219

What's the best approach to instantiate an object once and have it be reused
by the worker(s).

E.g. I have an object that loads some static state such as e.g. a
dictionary/map, is a part of 3rd party API and is not serializable.  I can't
seem to get it to be a singleton on the worker side as the JVM appears to be
wiped on every request so I get a new instance.  So the singleton doesn't
stick.

Is there an approach where I could have this object or a wrapper of it be a
broadcast var? Can Kryo get me there? would that basically mean writing a
custom serializer?  However, the 3rd party object may have a bunch of member
vars hanging off it, so serializing it properly may be non-trivial...

Any pointers/hints greatly appreciated.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Best practice for using singletons on workers (seems unanswered) ?

Posted by Dmitry Goldenberg <dg...@gmail.com>.
Richard,

That's exactly the strategy I've been trying, which is a wrapper singleton
class. But I was seeing the inner object being created multiple times.

I wonder if the problem has to do with the way I'm processing the RDD's.
I'm using JavaDStream to stream data (from Kafka). Then I'm processing the
RDD's like so

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...)
JavaDStream<String> messageBodies = messages.map(...)
messageBodies.foreachRDD(new MyFunction());

where MyFunction implements Function<JavaRDD<String>, Void> {
  ...
  rdd.map / rdd.filter ...
  rdd.foreach(... perform final action ...)
}

Perhaps the multiple singletons I'm seeing are the per-executor instances?
Judging by the streaming programming guide, perhaps I should follow the
connection sharing example:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

So I'd pre-create my singletons in the foreachPartition call which would
let them be the per-JVM singletons, to be passed into MyFunction which
would now be a partition processing function rather than an RDD processing
function.

I wonder whether these singletons would still be created on every call as
the master sends RDD data over to the workers ?

I also wonder whether using foreachPartition would be more efficient anyway
and prevent some of the over-network data shuffling effects that I imagine
may happen with just doing a foreachRDD ?











On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <rm...@localytics.com>
wrote:

> Would it be possible to have a wrapper class that just represents a
> reference to a singleton holding the 3rd party object? It could proxy over
> calls to the singleton object which will instantiate a private instance of
> the 3rd party object lazily? I think something like this might work if the
> workers have the singleton object in their classpath.
>
> here's a rough sketch of what I was thinking:
>
> object ThirdPartySingleton {
>   private lazy val thirdPartyObj = ...
>
>   def someProxyFunction() = thirdPartyObj.()
> }
>
> class ThirdPartyReference extends Serializable {
>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
> }
>
> also found this SO post:
> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>
>
> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg <dg...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>
>> What's the best approach to instantiate an object once and have it be
>> reused
>> by the worker(s).
>>
>> E.g. I have an object that loads some static state such as e.g. a
>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>> can't
>> seem to get it to be a singleton on the worker side as the JVM appears to
>> be
>> wiped on every request so I get a new instance.  So the singleton doesn't
>> stick.
>>
>> Is there an approach where I could have this object or a wrapper of it be
>> a
>> broadcast var? Can Kryo get me there? would that basically mean writing a
>> custom serializer?  However, the 3rd party object may have a bunch of
>> member
>> vars hanging off it, so serializing it properly may be non-trivial...
>>
>> Any pointers/hints greatly appreciated.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Best practice for using singletons on workers (seems unanswered) ?

Posted by Dmitry Goldenberg <dg...@gmail.com>.
Richard,
It seems whether I'm doing a foreachRDD or foreachPartition, I'm able to
create per-worker/per-JVM singletons. With 4 workers, I've got 4 singletons
created.  I wouldn't be able to use broadcast vars because the 3rd party
objects are not serializable.

The shuffling effect is basically whenever Spark has to pull data from
multiple machines together over the network when executing an action.
Probably not an issue for foreachRDD, but more for such actions as 'union'
or 'subtract' and the like.

On Wed, Jul 8, 2015 at 3:55 PM, Richard Marscher <rm...@localytics.com>
wrote:

> Ah, I see this is streaming. I haven't any practical experience with that
> side of Spark. But the foreachPartition idea is a good approach. I've used
> that pattern extensively, even though not for singletons, but just to
> create non-serializable objects like API and DB clients on the executor
> side. I think it's the most straightforward approach to dealing with any
> non-serializable object you need.
>
> I don't entirely follow what over-network data shuffling effects you are
> alluding to (maybe more specific to streaming?).
>
> On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg <
> dgoldenberg123@gmail.com> wrote:
>
>> My singletons do in fact stick around. They're one per worker, looks
>> like.  So with 4 workers running on the box, we're creating one singleton
>> per worker process/jvm, which seems OK.
>>
>> Still curious about foreachPartition vs. foreachRDD though...
>>
>> On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <
>> rmarscher@localytics.com> wrote:
>>
>>> Would it be possible to have a wrapper class that just represents a
>>> reference to a singleton holding the 3rd party object? It could proxy over
>>> calls to the singleton object which will instantiate a private instance of
>>> the 3rd party object lazily? I think something like this might work if the
>>> workers have the singleton object in their classpath.
>>>
>>> here's a rough sketch of what I was thinking:
>>>
>>> object ThirdPartySingleton {
>>>   private lazy val thirdPartyObj = ...
>>>
>>>   def someProxyFunction() = thirdPartyObj.()
>>> }
>>>
>>> class ThirdPartyReference extends Serializable {
>>>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
>>> }
>>>
>>> also found this SO post:
>>> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>>>
>>>
>>> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg <dg...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am seeing a lot of posts on singletons vs. broadcast variables, such
>>>> as
>>>> *
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>>>> *
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>>>
>>>> What's the best approach to instantiate an object once and have it be
>>>> reused
>>>> by the worker(s).
>>>>
>>>> E.g. I have an object that loads some static state such as e.g. a
>>>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>>>> can't
>>>> seem to get it to be a singleton on the worker side as the JVM appears
>>>> to be
>>>> wiped on every request so I get a new instance.  So the singleton
>>>> doesn't
>>>> stick.
>>>>
>>>> Is there an approach where I could have this object or a wrapper of it
>>>> be a
>>>> broadcast var? Can Kryo get me there? would that basically mean writing
>>>> a
>>>> custom serializer?  However, the 3rd party object may have a bunch of
>>>> member
>>>> vars hanging off it, so serializing it properly may be non-trivial...
>>>>
>>>> Any pointers/hints greatly appreciated.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Best practice for using singletons on workers (seems unanswered) ?

Posted by Richard Marscher <rm...@localytics.com>.
Ah, I see this is streaming. I haven't any practical experience with that
side of Spark. But the foreachPartition idea is a good approach. I've used
that pattern extensively, even though not for singletons, but just to
create non-serializable objects like API and DB clients on the executor
side. I think it's the most straightforward approach to dealing with any
non-serializable object you need.

I don't entirely follow what over-network data shuffling effects you are
alluding to (maybe more specific to streaming?).

On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg <dg...@gmail.com>
wrote:

> My singletons do in fact stick around. They're one per worker, looks
> like.  So with 4 workers running on the box, we're creating one singleton
> per worker process/jvm, which seems OK.
>
> Still curious about foreachPartition vs. foreachRDD though...
>
> On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <
> rmarscher@localytics.com> wrote:
>
>> Would it be possible to have a wrapper class that just represents a
>> reference to a singleton holding the 3rd party object? It could proxy over
>> calls to the singleton object which will instantiate a private instance of
>> the 3rd party object lazily? I think something like this might work if the
>> workers have the singleton object in their classpath.
>>
>> here's a rough sketch of what I was thinking:
>>
>> object ThirdPartySingleton {
>>   private lazy val thirdPartyObj = ...
>>
>>   def someProxyFunction() = thirdPartyObj.()
>> }
>>
>> class ThirdPartyReference extends Serializable {
>>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
>> }
>>
>> also found this SO post:
>> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>>
>>
>> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg <dg...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>>> *
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>>> *
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>>
>>> What's the best approach to instantiate an object once and have it be
>>> reused
>>> by the worker(s).
>>>
>>> E.g. I have an object that loads some static state such as e.g. a
>>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>>> can't
>>> seem to get it to be a singleton on the worker side as the JVM appears
>>> to be
>>> wiped on every request so I get a new instance.  So the singleton doesn't
>>> stick.
>>>
>>> Is there an approach where I could have this object or a wrapper of it
>>> be a
>>> broadcast var? Can Kryo get me there? would that basically mean writing a
>>> custom serializer?  However, the 3rd party object may have a bunch of
>>> member
>>> vars hanging off it, so serializing it properly may be non-trivial...
>>>
>>> Any pointers/hints greatly appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Best practice for using singletons on workers (seems unanswered) ?

Posted by Dmitry Goldenberg <dg...@gmail.com>.
My singletons do in fact stick around. They're one per worker, looks like.
So with 4 workers running on the box, we're creating one singleton per
worker process/jvm, which seems OK.

Still curious about foreachPartition vs. foreachRDD though...

On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <rm...@localytics.com>
wrote:

> Would it be possible to have a wrapper class that just represents a
> reference to a singleton holding the 3rd party object? It could proxy over
> calls to the singleton object which will instantiate a private instance of
> the 3rd party object lazily? I think something like this might work if the
> workers have the singleton object in their classpath.
>
> here's a rough sketch of what I was thinking:
>
> object ThirdPartySingleton {
>   private lazy val thirdPartyObj = ...
>
>   def someProxyFunction() = thirdPartyObj.()
> }
>
> class ThirdPartyReference extends Serializable {
>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
> }
>
> also found this SO post:
> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>
>
> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg <dg...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>
>> What's the best approach to instantiate an object once and have it be
>> reused
>> by the worker(s).
>>
>> E.g. I have an object that loads some static state such as e.g. a
>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>> can't
>> seem to get it to be a singleton on the worker side as the JVM appears to
>> be
>> wiped on every request so I get a new instance.  So the singleton doesn't
>> stick.
>>
>> Is there an approach where I could have this object or a wrapper of it be
>> a
>> broadcast var? Can Kryo get me there? would that basically mean writing a
>> custom serializer?  However, the 3rd party object may have a bunch of
>> member
>> vars hanging off it, so serializing it properly may be non-trivial...
>>
>> Any pointers/hints greatly appreciated.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Best practice for using singletons on workers (seems unanswered) ?

Posted by Richard Marscher <rm...@localytics.com>.
Would it be possible to have a wrapper class that just represents a
reference to a singleton holding the 3rd party object? It could proxy over
calls to the singleton object which will instantiate a private instance of
the 3rd party object lazily? I think something like this might work if the
workers have the singleton object in their classpath.

here's a rough sketch of what I was thinking:

object ThirdPartySingleton {
  private lazy val thirdPartyObj = ...

  def someProxyFunction() = thirdPartyObj.()
}

class ThirdPartyReference extends Serializable {
  def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
}

also found this SO post:
http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers


On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg <dg...@gmail.com>
wrote:

> Hi,
>
> I am seeing a lot of posts on singletons vs. broadcast variables, such as
> *
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
> *
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>
> What's the best approach to instantiate an object once and have it be
> reused
> by the worker(s).
>
> E.g. I have an object that loads some static state such as e.g. a
> dictionary/map, is a part of 3rd party API and is not serializable.  I
> can't
> seem to get it to be a singleton on the worker side as the JVM appears to
> be
> wiped on every request so I get a new instance.  So the singleton doesn't
> stick.
>
> Is there an approach where I could have this object or a wrapper of it be a
> broadcast var? Can Kryo get me there? would that basically mean writing a
> custom serializer?  However, the 3rd party object may have a bunch of
> member
> vars hanging off it, so serializing it properly may be non-trivial...
>
> Any pointers/hints greatly appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>