You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Reynold Xin <rx...@databricks.com> on 2014/07/17 07:06:08 UTC

small (yet major) change going in: broadcasting RDD to reduce task size

Hi Spark devs,

Want to give you guys a heads up that I'm working on a small (but major)
change with respect to how task dispatching works. Currently (as of Spark
1.0.1), Spark sends RDD object and closures using Akka along with the task
itself to the executors. This is however inefficient because all tasks in
the same stage use the same RDDs and closures, but we have to send these
closures and RDDs multiple times to the executors. This is especially bad
when some closure references some variable that is very large. The current
design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors,
and use Akka to only send a reference to the broadcast RDD/closure along
with the partition specific information for the task. For those of you who
know more about the internals, Spark already relies on broadcast to send
the Hadoop JobConf every time it uses the Hadoop input, because the JobConf
is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore
2. Task size will get smaller, resulting in faster scheduling and higher
task dispatch throughput.

In addition, the change will simplify some internals of Spark, removing the
need to maintain task caches and the complex logic to broadcast JobConf
(which also led to a deadlock recently).


Pull request attached: https://github.com/apache/spark/pull/1450

Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Reynold Xin <rx...@databricks.com>.
Yup - that is correct.  Thanks for clarifying.


On Wed, Jul 16, 2014 at 10:12 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> Hey Reynold, just to clarify, users will still have to manually broadcast
> objects that they want to use *across* operations (e.g. in multiple
> iterations of an algorithm, or multiple map functions, or stuff like that).
> But they won't have to broadcast something they only use once.
>
> Matei
>
> On Jul 16, 2014, at 10:07 PM, Reynold Xin <rx...@databricks.com> wrote:
>
> > Oops - the pull request should be
> https://github.com/apache/spark/pull/1452
> >
> >
> > On Wed, Jul 16, 2014 at 10:06 PM, Reynold Xin <rx...@databricks.com>
> wrote:
> >
> >> Hi Spark devs,
> >>
> >> Want to give you guys a heads up that I'm working on a small (but major)
> >> change with respect to how task dispatching works. Currently (as of
> Spark
> >> 1.0.1), Spark sends RDD object and closures using Akka along with the
> task
> >> itself to the executors. This is however inefficient because all tasks
> in
> >> the same stage use the same RDDs and closures, but we have to send these
> >> closures and RDDs multiple times to the executors. This is especially
> bad
> >> when some closure references some variable that is very large. The
> current
> >> design led to users having to explicitly broadcast large variables.
> >>
> >> The patch uses broadcast to send RDD objects and the closures to
> >> executors, and use Akka to only send a reference to the broadcast
> >> RDD/closure along with the partition specific information for the task.
> For
> >> those of you who know more about the internals, Spark already relies on
> >> broadcast to send the Hadoop JobConf every time it uses the Hadoop
> input,
> >> because the JobConf is large.
> >>
> >> The user-facing impact of the change include:
> >>
> >> 1. Users won't need to decide what to broadcast anymore
> >> 2. Task size will get smaller, resulting in faster scheduling and higher
> >> task dispatch throughput.
> >>
> >> In addition, the change will simplify some internals of Spark, removing
> >> the need to maintain task caches and the complex logic to broadcast
> JobConf
> >> (which also led to a deadlock recently).
> >>
> >>
> >> Pull request attached: https://github.com/apache/spark/pull/1450
> >>
> >>
> >>
> >>
> >>
>
>

Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Reynold, just to clarify, users will still have to manually broadcast objects that they want to use *across* operations (e.g. in multiple iterations of an algorithm, or multiple map functions, or stuff like that). But they won't have to broadcast something they only use once.

Matei

On Jul 16, 2014, at 10:07 PM, Reynold Xin <rx...@databricks.com> wrote:

> Oops - the pull request should be https://github.com/apache/spark/pull/1452
> 
> 
> On Wed, Jul 16, 2014 at 10:06 PM, Reynold Xin <rx...@databricks.com> wrote:
> 
>> Hi Spark devs,
>> 
>> Want to give you guys a heads up that I'm working on a small (but major)
>> change with respect to how task dispatching works. Currently (as of Spark
>> 1.0.1), Spark sends RDD object and closures using Akka along with the task
>> itself to the executors. This is however inefficient because all tasks in
>> the same stage use the same RDDs and closures, but we have to send these
>> closures and RDDs multiple times to the executors. This is especially bad
>> when some closure references some variable that is very large. The current
>> design led to users having to explicitly broadcast large variables.
>> 
>> The patch uses broadcast to send RDD objects and the closures to
>> executors, and use Akka to only send a reference to the broadcast
>> RDD/closure along with the partition specific information for the task. For
>> those of you who know more about the internals, Spark already relies on
>> broadcast to send the Hadoop JobConf every time it uses the Hadoop input,
>> because the JobConf is large.
>> 
>> The user-facing impact of the change include:
>> 
>> 1. Users won't need to decide what to broadcast anymore
>> 2. Task size will get smaller, resulting in faster scheduling and higher
>> task dispatch throughput.
>> 
>> In addition, the change will simplify some internals of Spark, removing
>> the need to maintain task caches and the complex logic to broadcast JobConf
>> (which also led to a deadlock recently).
>> 
>> 
>> Pull request attached: https://github.com/apache/spark/pull/1450
>> 
>> 
>> 
>> 
>> 


Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Reynold Xin <rx...@databricks.com>.
Oops - the pull request should be https://github.com/apache/spark/pull/1452


On Wed, Jul 16, 2014 at 10:06 PM, Reynold Xin <rx...@databricks.com> wrote:

> Hi Spark devs,
>
> Want to give you guys a heads up that I'm working on a small (but major)
> change with respect to how task dispatching works. Currently (as of Spark
> 1.0.1), Spark sends RDD object and closures using Akka along with the task
> itself to the executors. This is however inefficient because all tasks in
> the same stage use the same RDDs and closures, but we have to send these
> closures and RDDs multiple times to the executors. This is especially bad
> when some closure references some variable that is very large. The current
> design led to users having to explicitly broadcast large variables.
>
> The patch uses broadcast to send RDD objects and the closures to
> executors, and use Akka to only send a reference to the broadcast
> RDD/closure along with the partition specific information for the task. For
> those of you who know more about the internals, Spark already relies on
> broadcast to send the Hadoop JobConf every time it uses the Hadoop input,
> because the JobConf is large.
>
> The user-facing impact of the change include:
>
> 1. Users won't need to decide what to broadcast anymore
> 2. Task size will get smaller, resulting in faster scheduling and higher
> task dispatch throughput.
>
> In addition, the change will simplify some internals of Spark, removing
> the need to maintain task caches and the complex logic to broadcast JobConf
> (which also led to a deadlock recently).
>
>
> Pull request attached: https://github.com/apache/spark/pull/1450
>
>
>
>
>

Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Reynold Xin <rx...@databricks.com>.
Thanks :)

FYI the pull request has been merged and will be part of Spark 1.1.0.



On Thu, Jul 17, 2014 at 11:09 AM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> On Thu, Jul 17, 2014 at 1:23 AM, Stephen Haberman <
> stephen.haberman@gmail.com> wrote:
>
>> I'd be ecstatic if more major changes were this well/succinctly
>> explained
>>
>
> Ditto on that. The summary of user impact was very nice. It would be good
> to repeat that on the user list or release notes when this change goes out.
>
> Nick
>

Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Nicholas Chammas <ni...@gmail.com>.
On Thu, Jul 17, 2014 at 1:23 AM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> I'd be ecstatic if more major changes were this well/succinctly
> explained
>

Ditto on that. The summary of user impact was very nice. It would be good
to repeat that on the user list or release notes when this change goes out.

Nick

Re: small (yet major) change going in: broadcasting RDD to reduce task size

Posted by Stephen Haberman <st...@gmail.com>.
Wow. Great writeup.

I keep tabs on several open source projects that we use heavily, and
I'd be ecstatic if more major changes were this well/succinctly
explained instead of the usual "just read the commit message/diff".

- Stephen