You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hajira Jabeen <ha...@gmail.com> on 2015/12/28 17:21:32 UTC

Understanding Kmeans in Flink

Hello everyone,

I am trying to understand Kmeans in Flink, Scala.

I can see that the attached Kmeans-snippet (taken from Flink examples)
updates centroids.

in (1) map function assigns points to centroids,
in (3) centroids are grouped by their ids.
in (4) the x and y coordinates are being added

But, I cannot understand what happens at (2) and then (5) ?
I will really appreciate, if any one can elaborate how this works ?


Thanks
Hajira

-------------------------------------
K means code snippet
--------------------------------------
val newCentroids = points
1)        .map(new
SelectNearestCenter()).withBroadcastSet(currentCentroids, "centroids")
2)        .map { x => (x._1, x._2, 1L) }
3)        .groupBy(0)                                 // by centroid ID
4)        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }
5)        .map { x => new Centroid(x._1, x._2.div(x._3)) }
      newCentroids
--------------------------------------

Re: Understanding Kmeans in Flink

Posted by Márton Balassi <ba...@gmail.com>.
Hey Hajira,

Basically lines 2) to 5) determine the "mean" (centroid) of the new
clusters that we have just defined by assigning the points in line 1). As
calculating the mean is a non-associative function we break it down to two
associative parts: summation and counting - which is followed by dividing
the results of these parts to get the desired output. This is needed as
Flink jobs run in a distributed environment.

Remember that line 1) outputs (Int, Point) tuples, so the result of 2) is
(Int, Point, Double), where the third field is responsible to hold the
count which is initialized to 1L. Line 4) Not only sums the Point
coordinates, but does the same for the count (remember: summation and
counting to get the mean). At line 5 all that is left to divide the sum of
the points (which is still a point) by the number of points which leads to
the new centroid.

Flink also comes with a Java implementation [1] of the same algorithm which
comes with reasonably more boilerplate, but can definitely be helpful to
understand the algorithm.

I hope this helps,

Marton

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

On Mon, Dec 28, 2015 at 5:21 PM, Hajira Jabeen <ha...@gmail.com>
wrote:

> Hello everyone,
>
> I am trying to understand Kmeans in Flink, Scala.
>
> I can see that the attached Kmeans-snippet (taken from Flink examples)
> updates centroids.
>
> in (1) map function assigns points to centroids,
> in (3) centroids are grouped by their ids.
> in (4) the x and y coordinates are being added
>
> But, I cannot understand what happens at (2) and then (5) ?
> I will really appreciate, if any one can elaborate how this works ?
>
>
> Thanks
> Hajira
>
> -------------------------------------
> K means code snippet
> --------------------------------------
> val newCentroids = points
> 1)        .map(new
> SelectNearestCenter()).withBroadcastSet(currentCentroids, "centroids")
> 2)        .map { x => (x._1, x._2, 1L) }
> 3)        .groupBy(0)                                 // by centroid ID
> 4)        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }
> 5)        .map { x => new Centroid(x._1, x._2.div(x._3)) }
>       newCentroids
> --------------------------------------
>
>
>
>