You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ticup <ti...@gmail.com> on 2014/04/19 13:04:59 UTC

extremely slow k-means version

Hi,

I was playing around with other k-means implementations in Scala/Spark in
order to test performances and get a better grasp on Spark.

Now, I made one similar to the one from the examples
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
except that it's a bit less clever. Nevertheless, I expect a non-expert
scala/spark programmer to write similar code instead of that from the
example.

Here is how they compare: in the step of calculating the new centroids (this
is done by taking the average of all points belonging to the current
centroids - the main workhorse of the algo), where the *example algorithm*
adds the points of the same cluster and keeps track of the number of points
in each cluster in 1 step (by using reduceByKey and keeping a counter in the
reduce value):

val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
        
val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2,
y1 + y2)}

and then proceeds by dividing the sum of all points of a cluster by the
counted number of points in the cluster:

val newPoints = pointStats.map {pair => (pair._1, pair._2._1 /
pair._2._2)}.collectAsMap()

Afterwards the change of the new centroids is calculated in order to know
when to stop iterating:

tempDist = 0.0

for (i <- 0 until K) {
     tempDist += kPoints(i).squaredDist(newPoints(i))
}



*my algorithm *
(https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
is less clever, but more straightforward: it just groups all the points of
each cluster and then proceeds to calculate the average on those points and
adds the difference with the previous centroid to an accumulator:

 // accumulator for differences new centroids
 dist = sc.accumulator(0.0)

// calculate new centroids + add difference to old centroids
centroids = closest.groupByKey().map{case(i, points) =>
    val newCentroid = average(points)
    dist += centroids(i).squaredDist(newCentroid)
    newCentroid
}.collect()

with:

def average(points: Seq[Vector]) : Vector = {
    points.reduce(_+_) / points.length
}

So, the big differences are:

1. Use of accumulator
2. Do excessive work by not cleverly calculating the average
3. Accesses the centroids val from within the map


Now, why I'm here for, this version runs EXTREMELY slow and gets
outOfHeapMemory exceptions for data input that the original algorithm easily
solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
huge difference. The use of an accumulator shouldn't really affect the
performance and it doesn't, because I tried it without the accumulator and
it stays as slow. Further, I expect the excessive work to slow down the
algorithm with a factor of 2 or something, but this is really a decrease in
factors of 10 or more.

Even with 1 worker and 1 core (thus no parallelism) the difference in speed
stays the same, so it's not because the averaging is not parallelised
properly, there must be something going on that is much more important.

Could someone give me pointers on what exactly is happening here? It can't
be because I'm just accessing the centroids value from within the closure?

Speed comparison:

The *slow algorithm*: 44 seconds to perform the map
14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
k-means.scala:114) finished in 43.909 s


The *fast algorithm*: more or less the same operations (in 2 steps instead
of 1) in 2.2 seconds

14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
k-means.scala:84) finished in 2.090 s
....
14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
k-means.scala:86) finished in 0.117 s


Thanks in advance,
Tim.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: extremely slow k-means version

Posted by ticup <ti...@gmail.com>.
Thanks a lot for the explanation Matei.

As a matter of fact, I was just reading up on the paper on the Narrow and
Wide Dependencies and saw that groupByKey is indeed a wide dependency which,
as you explained, is the problem.

Maybe it wouldn't be a bad thing to have a section in the docs on the
wide/narrow dependencies? And maybe for each transformation the dependency
it creates. Although it's mostly obvious, it will stress the fact better
that you need to choose your transformations very carefully and that some
are much more preferred than others.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489p4493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: extremely slow k-means version

Posted by Matei Zaharia <ma...@gmail.com>.
The problem is that groupByKey means “bring all the points with this same key to the same JVM”. Your input is a Seq[Point], so you have to have all the points there. This means that a) all points will be sent across the network in a cluster, which is slow (and Spark goes through this sending code path even in local mode so it serializes the data), and b) you’ll get out of memory errors if that Seq is too big. In large-scale data processing, data movement is often the biggest cost, so you have to carefully choose which operations to use.

Matei

On Apr 19, 2014, at 4:04 AM, ticup <ti...@gmail.com> wrote:

> Hi,
> 
> I was playing around with other k-means implementations in Scala/Spark in
> order to test performances and get a better grasp on Spark.
> 
> Now, I made one similar to the one from the examples
> (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
> except that it's a bit less clever. Nevertheless, I expect a non-expert
> scala/spark programmer to write similar code instead of that from the
> example.
> 
> Here is how they compare: in the step of calculating the new centroids (this
> is done by taking the average of all points belonging to the current
> centroids - the main workhorse of the algo), where the *example algorithm*
> adds the points of the same cluster and keeps track of the number of points
> in each cluster in 1 step (by using reduceByKey and keeping a counter in the
> reduce value):
> 
> val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
> 
> val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2,
> y1 + y2)}
> 
> and then proceeds by dividing the sum of all points of a cluster by the
> counted number of points in the cluster:
> 
> val newPoints = pointStats.map {pair => (pair._1, pair._2._1 /
> pair._2._2)}.collectAsMap()
> 
> Afterwards the change of the new centroids is calculated in order to know
> when to stop iterating:
> 
> tempDist = 0.0
> 
> for (i <- 0 until K) {
>     tempDist += kPoints(i).squaredDist(newPoints(i))
> }
> 
> 
> 
> *my algorithm *
> (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
> is less clever, but more straightforward: it just groups all the points of
> each cluster and then proceeds to calculate the average on those points and
> adds the difference with the previous centroid to an accumulator:
> 
> // accumulator for differences new centroids
> dist = sc.accumulator(0.0)
> 
> // calculate new centroids + add difference to old centroids
> centroids = closest.groupByKey().map{case(i, points) =>
>    val newCentroid = average(points)
>    dist += centroids(i).squaredDist(newCentroid)
>    newCentroid
> }.collect()
> 
> with:
> 
> def average(points: Seq[Vector]) : Vector = {
>    points.reduce(_+_) / points.length
> }
> 
> So, the big differences are:
> 
> 1. Use of accumulator
> 2. Do excessive work by not cleverly calculating the average
> 3. Accesses the centroids val from within the map
> 
> 
> Now, why I'm here for, this version runs EXTREMELY slow and gets
> outOfHeapMemory exceptions for data input that the original algorithm easily
> solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
> huge difference. The use of an accumulator shouldn't really affect the
> performance and it doesn't, because I tried it without the accumulator and
> it stays as slow. Further, I expect the excessive work to slow down the
> algorithm with a factor of 2 or something, but this is really a decrease in
> factors of 10 or more.
> 
> Even with 1 worker and 1 core (thus no parallelism) the difference in speed
> stays the same, so it's not because the averaging is not parallelised
> properly, there must be something going on that is much more important.
> 
> Could someone give me pointers on what exactly is happening here? It can't
> be because I'm just accessing the centroids value from within the closure?
> 
> Speed comparison:
> 
> The *slow algorithm*: 44 seconds to perform the map
> 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
> k-means.scala:114) finished in 43.909 s
> 
> 
> The *fast algorithm*: more or less the same operations (in 2 steps instead
> of 1) in 2.2 seconds
> 
> 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
> k-means.scala:84) finished in 2.090 s
> ....
> 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
> k-means.scala:86) finished in 0.117 s
> 
> 
> Thanks in advance,
> Tim.
> 
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.