You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mayur Rustagi <ma...@gmail.com> on 2014/06/25 00:41:11 UTC

Re: balancing RDDs

This would be really useful. Especially for Shark where shift of
partitioning effects all subsequent queries unless task scheduling time
beats spark.locality.wait. Can cause overall low performance for all
subsequent tasks.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara <Se...@webtrends.com>
wrote:

> We have a use case where we’d like something to execute once on each node
> and I thought it would be good to ask here.
>
> Currently we achieve this by setting the parallelism to the number of
> nodes and use a mod partitioner:
>
> val balancedRdd = sc.parallelize(
>         (0 until Settings.parallelism)
>         .map(id => id -> Settings.settings)
>       ).partitionBy(new ModPartitioner(Settings.parallelism))
>       .cache()
>
>
> This works great except in two instances where it can become unbalanced:
>
> 1. if a worker is restarted or dies, the partition will move to a
> different node (one of the nodes will run two tasks).  When the worker
> rejoins, is there a way to have a partition move back over to the newly
> restarted worker so that it’s balanced again?
>
> 2. drivers need to be started in a staggered fashion, otherwise one driver
> can launch two tasks on one set of workers, and the other driver will do
> the same with the other set.  Are there any scheduler/config semantics so
> that each driver will take one (and only one) core from *each* node?
>
>
> Thanks
>
> Sean
>
>
>
>
>
>
>

Re: balancing RDDs

Posted by Mayur Rustagi <ma...@gmail.com>.
I would imagine this would be an extension of SchemaRDD (for Sparksql)  or
a new RDD altogether.
The RDD location is determined based on where task generating the RDD is
scheduled, the scheduler schedules basis of input RDD/sourcedata location.
So ideally RDD codebase needs to check location of input partition across
nodes & scheduling  preference of task related to unbalanced partition to
different nodes.. I am not sure if RDD can influence location of tasks
/partition location.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Wed, Jun 25, 2014 at 10:18 PM, Sean McNamara <Sean.McNamara@webtrends.com
> wrote:

> Yep exactly!  I’m not sure how complicated it would be to pull off.  If
> someone wouldn’t mind helping to get me pointed in the right direction I
> would be happy to look into and contribute this functionality.  I imagine
> this would be implemented in the scheduler codebase and there would be some
> sort of rebalance configuration property to enable it possibly?
>
> Does anyone else have any thoughts on this?
>
> Cheers,
>
> Sean
>
>
> On Jun 24, 2014, at 4:41 PM, Mayur Rustagi <ma...@gmail.com>
> wrote:
>
> > This would be really useful. Especially for Shark where shift of
> > partitioning effects all subsequent queries unless task scheduling time
> > beats spark.locality.wait. Can cause overall low performance for all
> > subsequent tasks.
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi <https://twitter.com/mayur_rustagi>
> >
> >
> >
> > On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara <
> Sean.McNamara@webtrends.com>
> > wrote:
> >
> >> We have a use case where we’d like something to execute once on each
> node
> >> and I thought it would be good to ask here.
> >>
> >> Currently we achieve this by setting the parallelism to the number of
> >> nodes and use a mod partitioner:
> >>
> >> val balancedRdd = sc.parallelize(
> >>        (0 until Settings.parallelism)
> >>        .map(id => id -> Settings.settings)
> >>      ).partitionBy(new ModPartitioner(Settings.parallelism))
> >>      .cache()
> >>
> >>
> >> This works great except in two instances where it can become unbalanced:
> >>
> >> 1. if a worker is restarted or dies, the partition will move to a
> >> different node (one of the nodes will run two tasks).  When the worker
> >> rejoins, is there a way to have a partition move back over to the newly
> >> restarted worker so that it’s balanced again?
> >>
> >> 2. drivers need to be started in a staggered fashion, otherwise one
> driver
> >> can launch two tasks on one set of workers, and the other driver will do
> >> the same with the other set.  Are there any scheduler/config semantics
> so
> >> that each driver will take one (and only one) core from *each* node?
> >>
> >>
> >> Thanks
> >>
> >> Sean
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>

Re: balancing RDDs

Posted by Sean McNamara <Se...@Webtrends.com>.
Yep exactly!  I’m not sure how complicated it would be to pull off.  If someone wouldn’t mind helping to get me pointed in the right direction I would be happy to look into and contribute this functionality.  I imagine this would be implemented in the scheduler codebase and there would be some sort of rebalance configuration property to enable it possibly?

Does anyone else have any thoughts on this?

Cheers,

Sean


On Jun 24, 2014, at 4:41 PM, Mayur Rustagi <ma...@gmail.com> wrote:

> This would be really useful. Especially for Shark where shift of
> partitioning effects all subsequent queries unless task scheduling time
> beats spark.locality.wait. Can cause overall low performance for all
> subsequent tasks.
> 
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
> 
> 
> 
> On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara <Se...@webtrends.com>
> wrote:
> 
>> We have a use case where we’d like something to execute once on each node
>> and I thought it would be good to ask here.
>> 
>> Currently we achieve this by setting the parallelism to the number of
>> nodes and use a mod partitioner:
>> 
>> val balancedRdd = sc.parallelize(
>>        (0 until Settings.parallelism)
>>        .map(id => id -> Settings.settings)
>>      ).partitionBy(new ModPartitioner(Settings.parallelism))
>>      .cache()
>> 
>> 
>> This works great except in two instances where it can become unbalanced:
>> 
>> 1. if a worker is restarted or dies, the partition will move to a
>> different node (one of the nodes will run two tasks).  When the worker
>> rejoins, is there a way to have a partition move back over to the newly
>> restarted worker so that it’s balanced again?
>> 
>> 2. drivers need to be started in a staggered fashion, otherwise one driver
>> can launch two tasks on one set of workers, and the other driver will do
>> the same with the other set.  Are there any scheduler/config semantics so
>> that each driver will take one (and only one) core from *each* node?
>> 
>> 
>> Thanks
>> 
>> Sean
>> 
>> 
>> 
>> 
>> 
>> 
>> 


Re: balancing RDDs

Posted by Sean McNamara <Se...@Webtrends.com>.
Yep exactly!  I’m not sure how complicated it would be to pull off.  If someone wouldn’t mind helping to get me pointed in the right direction I would be happy to look into and contribute this functionality.  I imagine this would be implemented in the scheduler codebase and there would be some sort of rebalance configuration property to enable it possibly?

Does anyone else have any thoughts on this?

Cheers,

Sean


On Jun 24, 2014, at 4:41 PM, Mayur Rustagi <ma...@gmail.com> wrote:

> This would be really useful. Especially for Shark where shift of
> partitioning effects all subsequent queries unless task scheduling time
> beats spark.locality.wait. Can cause overall low performance for all
> subsequent tasks.
> 
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
> 
> 
> 
> On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara <Se...@webtrends.com>
> wrote:
> 
>> We have a use case where we’d like something to execute once on each node
>> and I thought it would be good to ask here.
>> 
>> Currently we achieve this by setting the parallelism to the number of
>> nodes and use a mod partitioner:
>> 
>> val balancedRdd = sc.parallelize(
>>        (0 until Settings.parallelism)
>>        .map(id => id -> Settings.settings)
>>      ).partitionBy(new ModPartitioner(Settings.parallelism))
>>      .cache()
>> 
>> 
>> This works great except in two instances where it can become unbalanced:
>> 
>> 1. if a worker is restarted or dies, the partition will move to a
>> different node (one of the nodes will run two tasks).  When the worker
>> rejoins, is there a way to have a partition move back over to the newly
>> restarted worker so that it’s balanced again?
>> 
>> 2. drivers need to be started in a staggered fashion, otherwise one driver
>> can launch two tasks on one set of workers, and the other driver will do
>> the same with the other set.  Are there any scheduler/config semantics so
>> that each driver will take one (and only one) core from *each* node?
>> 
>> 
>> Thanks
>> 
>> Sean
>> 
>> 
>> 
>> 
>> 
>> 
>>