You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kyle Ellrott <ke...@soe.ucsc.edu> on 2014/07/01 19:01:08 UTC

Re: Improving Spark multithreaded performance?

This all seems pretty hackish and a lot of trouble to get around
limitations in mllib.
The big limitation is that right now, the optimization algorithms work on
one large dataset at a time. We need a second of set of methods to work on
a large number of medium sized datasets.
I've started to code a new set of optimization methods to add into mllib.
I've started with GroupedGradientDecent (
https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala
)

GroupedGradientDecent is based on GradientDecent, but instead, it takes
RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double,
Vector)]. The Int serves as key to mark which elements should be grouped
together. This lets you multiplex several dataset optimizations into the
same RDD.

I think I've gotten the GroupedGradientDecent to work correctly. I need to
go up the stack and start adding methods like SVMWithSGD.trainGroup.

Does anybody have any thoughts on this?

Kyle



On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng <me...@gmail.com> wrote:

> The RDD is cached in only one or two workers. All other executors need
> to fetch its content via network. Since the dataset is not huge, could
> you try this?
>
> val features: Array[Vector] = ...
> val featuresBc = sc.broadcast(features)
>  // parallel loops
>  val labels: Array[Double] =
>  val rdd = sc.parallelize(0 until 1, 1).flatMap(i =>
> featuresBc.value.view.zip(labels))
>  val model = SVMWithSGD.train(rdd)
>  models(i) = model
>
> Using BT broadcast factory would improve the performance of broadcasting.
>
> Best,
> Xiangrui
>
> On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott <ke...@soe.ucsc.edu>
> wrote:
> > 1) I'm using the static SVMWithSGD.train, with no options.
> > 2) I have about 20,000 features (~5000 samples) that are being attached
> and
> > trained against 14,000 different sets of labels (ie I'll be doing 14,000
> > different training runs against the same sets of features trying to
> figure
> > out which labels can be learned), and I would also like to do cross fold
> > validation.
> >
> > The driver doesn't seem to be using too much memory. I left it as -Xmx8g
> and
> > it never complained.
> >
> > Kyle
> >
> >
> >
> > On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng <me...@gmail.com> wrote:
> >>
> >> Hi Kyle,
> >>
> >> A few questions:
> >>
> >> 1) Did you use `setIntercept(true)`?
> >> 2) How many features?
> >>
> >> I'm a little worried about driver's load because the final aggregation
> >> and weights update happen on the driver. Did you check driver's memory
> >> usage as well?
> >>
> >> Best,
> >> Xiangrui
> >>
> >> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <ke...@soe.ucsc.edu>
> >> wrote:
> >> > As far as I can tell there are is no data to broadcast (unless there
> is
> >> > something internal to mllib that needs to be broadcast) I've coalesced
> >> > the
> >> > input RDDs to keep the number of partitions limited. When running,
> I've
> >> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs
> >> > down
> >> > to 2 partitions, so about 1000 tasks.
> >> > Despite having over 500 threads in the threadpool working on mllib
> >> > tasks,
> >> > the total CPU usage never really goes above 150%.
> >> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do
> >> > anything.
> >> >
> >> > My one thought would be that maybe because I'm using MLUtils.kFold to
> >> > generate the RDDs is that because I have so many tasks working off
> RDDs
> >> > that
> >> > are permutations of original RDDs that maybe that is creating some
> sort
> >> > of
> >> > dependency bottleneck.
> >> >
> >> > Kyle
> >> >
> >> >
> >> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <il...@gmail.com>
> >> > wrote:
> >> >>
> >> >> I don't have specific solutions for you, but the general things to
> try
> >> >> are:
> >> >>
> >> >> - Decrease task size by broadcasting any non-trivial objects.
> >> >> - Increase duration of tasks by making them less fine-grained.
> >> >>
> >> >> How many tasks are you sending? I've seen in the past something like
> 25
> >> >> seconds for ~10k total medium-sized tasks.
> >> >>
> >> >>
> >> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott <
> kellrott@soe.ucsc.edu>
> >> >> wrote:
> >> >>>
> >> >>> I'm working to set up a calculation that involves calling mllib's
> >> >>> SVMWithSGD.train several thousand times on different permutations of
> >> >>> the
> >> >>> data. I'm trying to run the separate jobs using a threadpool to
> >> >>> dispatch the
> >> >>> different requests to a spark context connected a Mesos's cluster,
> >> >>> using
> >> >>> course scheduling, and a max of 2000 cores on Spark 1.0.
> >> >>> Total utilization of the system is terrible. Most of the 'aggregate
> at
> >> >>> GradientDescent.scala:178' stages(where mllib spends most of its
> time)
> >> >>> take
> >> >>> about 3 seconds, but have ~25 seconds of scheduler delay time.
> >> >>> What kind of things can I do to improve this?
> >> >>>
> >> >>> Kyle
> >> >>
> >> >>
> >> >
> >
> >
>