You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kyle Ellrott <ke...@soe.ucsc.edu> on 2014/06/26 21:06:19 UTC

Improving Spark multithreaded performance?

I'm working to set up a calculation that involves calling
mllib's SVMWithSGD.train several thousand times on different permutations
of the data. I'm trying to run the separate jobs using a threadpool to
dispatch the different requests to a spark context connected a Mesos's
cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
Total utilization of the system is terrible. Most of the 'aggregate at
GradientDescent.scala:178' stages(where mllib spends most of its time) take
about 3 seconds, but have ~25 seconds of scheduler delay time.
What kind of things can I do to improve this?

Kyle

Re: Improving Spark multithreaded performance?

Posted by anoldbrain <an...@gmail.com>.
I have not used this, only watched a presentation of it in spark summit 2013.

https://github.com/radlab/sparrow
https://spark-summit.org/talk/ousterhout-next-generation-spark-scheduling-with-sparrow/

Pure conjecture from your high scheduling latency and the size of your
cluster, it seems one way to look at.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Improving-Spark-multithreaded-performance-tp8359p8411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Improving Spark multithreaded performance?

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
This all seems pretty hackish and a lot of trouble to get around
limitations in mllib.
The big limitation is that right now, the optimization algorithms work on
one large dataset at a time. We need a second of set of methods to work on
a large number of medium sized datasets.
I've started to code a new set of optimization methods to add into mllib.
I've started with GroupedGradientDecent (
https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala
)

GroupedGradientDecent is based on GradientDecent, but instead, it takes
RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double,
Vector)]. The Int serves as key to mark which elements should be grouped
together. This lets you multiplex several dataset optimizations into the
same RDD.

I think I've gotten the GroupedGradientDecent to work correctly. I need to
go up the stack and start adding methods like SVMWithSGD.trainGroup.

Does anybody have any thoughts on this?

Kyle



On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng <me...@gmail.com> wrote:

> The RDD is cached in only one or two workers. All other executors need
> to fetch its content via network. Since the dataset is not huge, could
> you try this?
>
> val features: Array[Vector] = ...
> val featuresBc = sc.broadcast(features)
>  // parallel loops
>  val labels: Array[Double] =
>  val rdd = sc.parallelize(0 until 1, 1).flatMap(i =>
> featuresBc.value.view.zip(labels))
>  val model = SVMWithSGD.train(rdd)
>  models(i) = model
>
> Using BT broadcast factory would improve the performance of broadcasting.
>
> Best,
> Xiangrui
>
> On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
> wrote:
> > 1) I'm using the static SVMWithSGD.train, with no options.
> > 2) I have about 20,000 features (~5000 samples) that are being attached
> and
> > trained against 14,000 different sets of labels (ie I'll be doing 14,000
> > different training runs against the same sets of features trying to
> figure
> > out which labels can be learned), and I would also like to do cross fold
> > validation.
> >
> > The driver doesn't seem to be using too much memory. I left it as -Xmx8g
> and
> > it never complained.
> >
> > Kyle
> >
> >
> >
> > On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng <me...@gmail.com> wrote:
> >>
> >> Hi Kyle,
> >>
> >> A few questions:
> >>
> >> 1) Did you use `setIntercept(true)`?
> >> 2) How many features?
> >>
> >> I'm a little worried about driver's load because the final aggregation
> >> and weights update happen on the driver. Did you check driver's memory
> >> usage as well?
> >>
> >> Best,
> >> Xiangrui
> >>
> >> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <ke...@soe.ucsc.edu>
> >> wrote:
> >> > As far as I can tell there are is no data to broadcast (unless there
> is
> >> > something internal to mllib that needs to be broadcast) I've coalesced
> >> > the
> >> > input RDDs to keep the number of partitions limited. When running,
> I've
> >> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs
> >> > down
> >> > to 2 partitions, so about 1000 tasks.
> >> > Despite having over 500 threads in the threadpool working on mllib
> >> > tasks,
> >> > the total CPU usage never really goes above 150%.
> >> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do
> >> > anything.
> >> >
> >> > My one thought would be that maybe because I'm using MLUtils.kFold to
> >> > generate the RDDs is that because I have so many tasks working off
> RDDs
> >> > that
> >> > are permutations of original RDDs that maybe that is creating some
> sort
> >> > of
> >> > dependency bottleneck.
> >> >
> >> > Kyle
> >> >
> >> >
> >> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com>
> >> > wrote:
> >> >>
> >> >> I don't have specific solutions for you, but the general things to
> try
> >> >> are:
> >> >>
> >> >> - Decrease task size by broadcasting any non-trivial objects.
> >> >> - Increase duration of tasks by making them less fine-grained.
> >> >>
> >> >> How many tasks are you sending? I've seen in the past something like
> 25
> >> >> seconds for ~10k total medium-sized tasks.
> >> >>
> >> >>
> >> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <
> kellrott@soe.ucsc.edu>
> >> >> wrote:
> >> >>>
> >> >>> I'm working to set up a calculation that involves calling mllib's
> >> >>> SVMWithSGD.train several thousand times on different permutations of
> >> >>> the
> >> >>> data. I'm trying to run the separate jobs using a threadpool to
> >> >>> dispatch the
> >> >>> different requests to a spark context connected a Mesos's cluster,
> >> >>> using
> >> >>> course scheduling, and a max of 2000 cores on Spark 1.0.
> >> >>> Total utilization of the system is terrible. Most of the 'aggregate
> at
> >> >>> GradientDescent.scala:178' stages(where mllib spends most of its
> time)
> >> >>> take
> >> >>> about 3 seconds, but have ~25 seconds of scheduler delay time.
> >> >>> What kind of things can I do to improve this?
> >> >>>
> >> >>> Kyle
> >> >>
> >> >>
> >> >
> >
> >
>

Re: Improving Spark multithreaded performance?

Posted by Xiangrui Meng <me...@gmail.com>.
The RDD is cached in only one or two workers. All other executors need
to fetch its content via network. Since the dataset is not huge, could
you try this?

val features: Array[Vector] = ...
val featuresBc = sc.broadcast(features)
 // parallel loops
 val labels: Array[Double] =
 val rdd = sc.parallelize(0 until 1, 1).flatMap(i =>
featuresBc.value.view.zip(labels))
 val model = SVMWithSGD.train(rdd)
 models(i) = model

Using BT broadcast factory would improve the performance of broadcasting.

Best,
Xiangrui

On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu> wrote:
> 1) I'm using the static SVMWithSGD.train, with no options.
> 2) I have about 20,000 features (~5000 samples) that are being attached and
> trained against 14,000 different sets of labels (ie I'll be doing 14,000
> different training runs against the same sets of features trying to figure
> out which labels can be learned), and I would also like to do cross fold
> validation.
>
> The driver doesn't seem to be using too much memory. I left it as -Xmx8g and
> it never complained.
>
> Kyle
>
>
>
> On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng <me...@gmail.com> wrote:
>>
>> Hi Kyle,
>>
>> A few questions:
>>
>> 1) Did you use `setIntercept(true)`?
>> 2) How many features?
>>
>> I'm a little worried about driver's load because the final aggregation
>> and weights update happen on the driver. Did you check driver's memory
>> usage as well?
>>
>> Best,
>> Xiangrui
>>
>> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <ke...@soe.ucsc.edu>
>> wrote:
>> > As far as I can tell there are is no data to broadcast (unless there is
>> > something internal to mllib that needs to be broadcast) I've coalesced
>> > the
>> > input RDDs to keep the number of partitions limited. When running, I've
>> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs
>> > down
>> > to 2 partitions, so about 1000 tasks.
>> > Despite having over 500 threads in the threadpool working on mllib
>> > tasks,
>> > the total CPU usage never really goes above 150%.
>> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do
>> > anything.
>> >
>> > My one thought would be that maybe because I'm using MLUtils.kFold to
>> > generate the RDDs is that because I have so many tasks working off RDDs
>> > that
>> > are permutations of original RDDs that maybe that is creating some sort
>> > of
>> > dependency bottleneck.
>> >
>> > Kyle
>> >
>> >
>> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com>
>> > wrote:
>> >>
>> >> I don't have specific solutions for you, but the general things to try
>> >> are:
>> >>
>> >> - Decrease task size by broadcasting any non-trivial objects.
>> >> - Increase duration of tasks by making them less fine-grained.
>> >>
>> >> How many tasks are you sending? I've seen in the past something like 25
>> >> seconds for ~10k total medium-sized tasks.
>> >>
>> >>
>> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
>> >> wrote:
>> >>>
>> >>> I'm working to set up a calculation that involves calling mllib's
>> >>> SVMWithSGD.train several thousand times on different permutations of
>> >>> the
>> >>> data. I'm trying to run the separate jobs using a threadpool to
>> >>> dispatch the
>> >>> different requests to a spark context connected a Mesos's cluster,
>> >>> using
>> >>> course scheduling, and a max of 2000 cores on Spark 1.0.
>> >>> Total utilization of the system is terrible. Most of the 'aggregate at
>> >>> GradientDescent.scala:178' stages(where mllib spends most of its time)
>> >>> take
>> >>> about 3 seconds, but have ~25 seconds of scheduler delay time.
>> >>> What kind of things can I do to improve this?
>> >>>
>> >>> Kyle
>> >>
>> >>
>> >
>
>

Re: Improving Spark multithreaded performance?

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
1) I'm using the static SVMWithSGD.train, with no options.
2) I have about 20,000 features (~5000 samples) that are being attached and
trained against 14,000 different sets of labels (ie I'll be doing 14,000
different training runs against the same sets of features trying to figure
out which labels can be learned), and I would also like to do cross fold
validation.

The driver doesn't seem to be using too much memory. I left it as -Xmx8g
and it never complained.

Kyle



On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng <me...@gmail.com> wrote:

> Hi Kyle,
>
> A few questions:
>
> 1) Did you use `setIntercept(true)`?
> 2) How many features?
>
> I'm a little worried about driver's load because the final aggregation
> and weights update happen on the driver. Did you check driver's memory
> usage as well?
>
> Best,
> Xiangrui
>
> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <ke...@soe.ucsc.edu>
> wrote:
> > As far as I can tell there are is no data to broadcast (unless there is
> > something internal to mllib that needs to be broadcast) I've coalesced
> the
> > input RDDs to keep the number of partitions limited. When running, I've
> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs
> down
> > to 2 partitions, so about 1000 tasks.
> > Despite having over 500 threads in the threadpool working on mllib tasks,
> > the total CPU usage never really goes above 150%.
> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do
> > anything.
> >
> > My one thought would be that maybe because I'm using MLUtils.kFold to
> > generate the RDDs is that because I have so many tasks working off RDDs
> that
> > are permutations of original RDDs that maybe that is creating some sort
> of
> > dependency bottleneck.
> >
> > Kyle
> >
> >
> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com>
> wrote:
> >>
> >> I don't have specific solutions for you, but the general things to try
> >> are:
> >>
> >> - Decrease task size by broadcasting any non-trivial objects.
> >> - Increase duration of tasks by making them less fine-grained.
> >>
> >> How many tasks are you sending? I've seen in the past something like 25
> >> seconds for ~10k total medium-sized tasks.
> >>
> >>
> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
> >> wrote:
> >>>
> >>> I'm working to set up a calculation that involves calling mllib's
> >>> SVMWithSGD.train several thousand times on different permutations of
> the
> >>> data. I'm trying to run the separate jobs using a threadpool to
> dispatch the
> >>> different requests to a spark context connected a Mesos's cluster,
> using
> >>> course scheduling, and a max of 2000 cores on Spark 1.0.
> >>> Total utilization of the system is terrible. Most of the 'aggregate at
> >>> GradientDescent.scala:178' stages(where mllib spends most of its time)
> take
> >>> about 3 seconds, but have ~25 seconds of scheduler delay time.
> >>> What kind of things can I do to improve this?
> >>>
> >>> Kyle
> >>
> >>
> >
>

Re: Improving Spark multithreaded performance?

Posted by Xiangrui Meng <me...@gmail.com>.
Hi Kyle,

A few questions:

1) Did you use `setIntercept(true)`?
2) How many features?

I'm a little worried about driver's load because the final aggregation
and weights update happen on the driver. Did you check driver's memory
usage as well?

Best,
Xiangrui

On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <ke...@soe.ucsc.edu> wrote:
> As far as I can tell there are is no data to broadcast (unless there is
> something internal to mllib that needs to be broadcast) I've coalesced the
> input RDDs to keep the number of partitions limited. When running, I've
> tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
> to 2 partitions, so about 1000 tasks.
> Despite having over 500 threads in the threadpool working on mllib tasks,
> the total CPU usage never really goes above 150%.
> I've tried increasing 'spark.akka.threads' but that doesn't seem to do
> anything.
>
> My one thought would be that maybe because I'm using MLUtils.kFold to
> generate the RDDs is that because I have so many tasks working off RDDs that
> are permutations of original RDDs that maybe that is creating some sort of
> dependency bottleneck.
>
> Kyle
>
>
> On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com> wrote:
>>
>> I don't have specific solutions for you, but the general things to try
>> are:
>>
>> - Decrease task size by broadcasting any non-trivial objects.
>> - Increase duration of tasks by making them less fine-grained.
>>
>> How many tasks are you sending? I've seen in the past something like 25
>> seconds for ~10k total medium-sized tasks.
>>
>>
>> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
>> wrote:
>>>
>>> I'm working to set up a calculation that involves calling mllib's
>>> SVMWithSGD.train several thousand times on different permutations of the
>>> data. I'm trying to run the separate jobs using a threadpool to dispatch the
>>> different requests to a spark context connected a Mesos's cluster, using
>>> course scheduling, and a max of 2000 cores on Spark 1.0.
>>> Total utilization of the system is terrible. Most of the 'aggregate at
>>> GradientDescent.scala:178' stages(where mllib spends most of its time) take
>>> about 3 seconds, but have ~25 seconds of scheduler delay time.
>>> What kind of things can I do to improve this?
>>>
>>> Kyle
>>
>>
>

Re: Improving Spark multithreaded performance?

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
As far as I can tell there are is no data to broadcast (unless there is
something internal to mllib that needs to be broadcast) I've coalesced the
input RDDs to keep the number of partitions limited. When running, I've
tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
to 2 partitions, so about 1000 tasks.
Despite having over 500 threads in the threadpool working on mllib tasks,
the total CPU usage never really goes above 150%.
I've tried increasing 'spark.akka.threads' but that doesn't seem to do
anything.

My one thought would be that maybe because I'm using MLUtils.kFold to
generate the RDDs is that because I have so many tasks working off RDDs
that are permutations of original RDDs that maybe that is creating some
sort of dependency bottleneck.

Kyle


On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com> wrote:

> I don't have specific solutions for you, but the general things to try are:
>
> - Decrease task size by broadcasting any non-trivial objects.
> - Increase duration of tasks by making them less fine-grained.
>
> How many tasks are you sending? I've seen in the past something like 25
> seconds for ~10k total medium-sized tasks.
>
>
> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
> wrote:
>
>> I'm working to set up a calculation that involves calling
>> mllib's SVMWithSGD.train several thousand times on different permutations
>> of the data. I'm trying to run the separate jobs using a threadpool to
>> dispatch the different requests to a spark context connected a Mesos's
>> cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
>> Total utilization of the system is terrible. Most of the 'aggregate at
>> GradientDescent.scala:178' stages(where mllib spends most of its time) take
>> about 3 seconds, but have ~25 seconds of scheduler delay time.
>> What kind of things can I do to improve this?
>>
>> Kyle
>>
>
>

Re: Improving Spark multithreaded performance?

Posted by Aaron Davidson <il...@gmail.com>.
I don't have specific solutions for you, but the general things to try are:

- Decrease task size by broadcasting any non-trivial objects.
- Increase duration of tasks by making them less fine-grained.

How many tasks are you sending? I've seen in the past something like 25
seconds for ~10k total medium-sized tasks.


On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
wrote:

> I'm working to set up a calculation that involves calling
> mllib's SVMWithSGD.train several thousand times on different permutations
> of the data. I'm trying to run the separate jobs using a threadpool to
> dispatch the different requests to a spark context connected a Mesos's
> cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
> Total utilization of the system is terrible. Most of the 'aggregate at
> GradientDescent.scala:178' stages(where mllib spends most of its time) take
> about 3 seconds, but have ~25 seconds of scheduler delay time.
> What kind of things can I do to improve this?
>
> Kyle
>