You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/30 22:32:20 UTC
[jira] [Commented] (FLINK-2131) Add Initialization schemes for
K-means clustering
[ https://issues.apache.org/jira/browse/FLINK-2131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15537255#comment-15537255 ]
ASF GitHub Bot commented on FLINK-2131:
---------------------------------------
Github user skonto commented on a diff in the pull request:
https://github.com/apache/flink/pull/757#discussion_r81429036
--- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala ---
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml._
+import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BLAS, Vector}
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least within-cluster sum of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData)
+ * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids)
+ *
+ * val kmeans = KMeans()
+ * .setInitialCentroids(initialCentroids)
+ * .setNumIterations(10)
+ *
+ * kmeans.fit(trainingDS)
+ *
+ * // getting the computed centroids
+ * val centroidsResult = kmeans.centroids.get.collect()
+ *
+ * // get matching clusters for new points
+ * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData)
+ * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of the initial centroids
+ * mainly affects the outcome of the algorithm.
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialStrategy]]:
+ * Defines the initialization strategy to be used for initializing the KMeans algorithm in case
+ * the initial centroids are not provided. Allowed values are "random", "kmeans++" and "kmeans||".
+ * (Default Value: '''random''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumClusters]]:
+ * Defines the number of clusters required. This is essential to provide when only the
+ * initialization strategy is specified, not the initial centroids themselves.
+ * (Default Value: '''0''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.OversamplingFactor]]:
+ * Defines the oversampling rate for the kmeans|| initialization.
+ * (Default Value: '''2k'''), where k is the number of clusters.
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.KMeansParRounds]]:
+ * Defines the number of rounds for the kmeans|| initialization.
+ * (Default Value: '''5''')
+ *
+ */
+class KMeans extends Predictor[KMeans] {
+
+ import KMeans._
+
+ /**
+ * Stores the learned clusters after the fit operation
+ */
+ var centroids: Option[DataSet[Seq[LabeledVector]]] = None
+
+ /**
+ * Sets the maximum number of iterations.
+ *
+ * @param numIterations The maximum number of iterations.
+ * @return itself
+ */
+ def setNumIterations(numIterations: Int): KMeans = {
+ parameters.add(NumIterations, numIterations)
+ this
+ }
+
+ /**
+ * Sets the number of clusters.
+ *
+ * @param numClusters The number of clusters
+ * @return itself
+ */
+ def setNumClusters(numClusters: Int): KMeans = {
+ parameters.add(NumClusters, numClusters)
+ this
+ }
+
+ /**
+ * Sets the initial centroids on which the algorithm will start computing. These points should
+ * depend on the data and will significantly influence the resulting centroids.
+ * Note that this setting will override [[setInitializationStrategy())]] and the size of
+ * initialCentroids will override the value, if set, by [[setNumClusters()]]
+ *
+ * @param initialCentroids A set of labeled vectors.
+ * @return itself
+ */
+ def setInitialCentroids(initialCentroids: Seq[LabeledVector]): KMeans = {
+ parameters.add(InitialCentroids, initialCentroids)
+ this
+ }
+
+ /**
+ * Automatically initialize the KMeans algorithm. Allowed options are "random", "kmeans++" and
+ * "kmeans||"
+ *
+ * @param initialStrategy
+ * @return itself
+ */
+ def setInitializationStrategy(initialStrategy: String): KMeans = {
+ require(Array("random", "kmeans++", "kmeans||").contains(initialStrategy), s"$initialStrategy" +
+ s" is not supported")
+ parameters.add(InitialStrategy, initialStrategy)
+ this
+ }
+
+ /**
+ * Oversampling factor to be used in case the initialization strategy is set to be "kmeans||"
+ *
+ * @param oversamplingFactor Oversampling factor(\ell)
+ * @return this
+ */
+ def setOversamplingFactor(oversamplingFactor: Double): KMeans = {
+ require(oversamplingFactor > 0, "Oversampling factor must be positive.")
+ parameters.add(OversamplingFactor, oversamplingFactor)
+ this
+ }
+
+ /**
+ * Number of initialization rounds to be done when the initialization strategy is set to be
+ * "kmeans||"
+ *
+ * @param numRounds Number of rounds(r)
+ * @return this
+ */
+ def setNumRounds(numRounds: Int): KMeans = {
+ require(numRounds > 0, "Number of rounds must be positive")
+ parameters.add(KMeansParRounds, numRounds)
+ this
+ }
+
+}
+
+/**
+ * Companion object of KMeans. Contains convenience functions, the parameter type definitions
+ * of the algorithm and the [[FitOperation]] & [[PredictOperation]].
+ */
+object KMeans {
+
+ private val RANDOM_FRACTION = "random_sample_fraction"
+ private val PARINIT_SET = "par_init_solution_set"
+ private val PARINIT_COST = "par_init_solution_cost"
+ private val PARINIT_SAMPLE = "par_init_oversample_factor"
+
+ /** Euclidean Distance Metric */
+ val euclidean = EuclideanDistanceMetric()
+
+ case object NumIterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object InitialCentroids extends Parameter[Seq[LabeledVector]] {
+ val defaultValue = None
+ }
+
+ case object InitialStrategy extends Parameter[String]{
+ val defaultValue = Some("kmeans||")
+ }
+
+ case object NumClusters extends Parameter[Int] {
+ val defaultValue = None
+ }
+
+ case object OversamplingFactor extends Parameter[Double] {
+ val defaultValue = None
+ }
+
+ case object KMeansParRounds extends Parameter[Int] {
+ val defaultValue = Some(5)
+ }
+
+ // ========================================== Factory methods ====================================
+
+ def apply(): KMeans = {
+ new KMeans()
+ }
+
+ // ========================================== Operations =========================================
+
+ /** Provides the operation that makes the predictions for individual examples.
+ * The label of the vector will be the index of the cluster the input vector belongs to.
+ *
+ * @tparam T
+ * @return A PredictOperation, through which it is possible to predict a value, given a
+ * feature vector
+ */
+ implicit def predictVectors[T <: Vector] = {
+ new PredictOperation[KMeans, Seq[LabeledVector], T, Double](){
+
+ override def getModel(
+ self: KMeans,
+ predictParameters: ParameterMap)
+ : DataSet[Seq[LabeledVector]] = {
+
+ self.centroids match {
+ case Some(model) => model
+ case None => {
+ throw new RuntimeException("The KMeans model has not been trained. Call first fit" +
+ "before calling the predict operation.")
+ }
+ }
+ }
+
+ override def predict(value: T, model: Seq[LabeledVector]): Double = {
+ findNearestCentroid(value, model)._1
+ }
+ }
+ }
+
+ /**
+ * [[FitOperation]] which iteratively computes centroids that match the given input DataSet by
+ * adjusting the given initial centroids.
+ *
+ * @return A new [[FitOperation]] to train the model using the training data set.
+ */
+ implicit def fitKMeans = {
+ new FitOperation[KMeans, Vector] {
+ override def fit(instance: KMeans, fitParameters: ParameterMap, trainingDS: DataSet[Vector])
+ : Unit = {
+ val resultingParameters = instance.parameters ++ fitParameters
+
+ // ================= INITIALIZATION OF KMEANS ==========================
+ val centroids: DataSet[Seq[LabeledVector]] = init(trainingDS, resultingParameters)
+
+ val numIterations: Int = resultingParameters.get(NumIterations).get
+
+ val finalCentroids = centroids.iterate(numIterations) { currentCentroids =>
+ val newCentroids: DataSet[LabeledVector] = trainingDS
+ .mapWithBcVariable(currentCentroids)
+ { (dataPoint, centroids) => selectNearestCentroid(dataPoint, centroids) }
+ .map(x => (x.label, x.vector, 1.0)).withForwardedFields("label->_1; vector->_2")
+ .groupBy(x => x._1)
+ .reduce((p1, p2) =>
+ (p1._1, (p1._2.asBreeze + p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
+ // TODO replace addition of Breeze vectors by future build in flink function
+ .withForwardedFields("_1")
+ .map(x => {
+ BLAS.scal(1.0 / x._3, x._2)
+ LabeledVector(x._1, x._2)
+ })
+ .withForwardedFields("_1->label")
+
+ // currentCentroids contains only one element. So, this is output only once
+ currentCentroids.mapWithBcSet(newCentroids){
+ (_,newCenters) => newCenters
+ }
+ }
+ instance.centroids = Some(finalCentroids)
+ }
+ }
+ }
+
+ /**
+ * Converts a given vector into a labeled vector where the label denotes the label of the closest
+ * centroid.
+ *
+ * @param dataPoint The vector to determine the nearest centroid.
+ * @param centroids A collection of the centroids.
+ * @return A [[LabeledVector]] consisting of the input vector and the label of the closest
+ * centroid.
+ */
+ @ForwardedFields(Array("*->vector"))
+ private def selectNearestCentroid(dataPoint: Vector, centroids: Seq[LabeledVector]) = {
+ val nearest = findNearestCentroid(dataPoint, centroids)
+ LabeledVector(nearest._1, dataPoint)
+ }
+
+ /**
+ * Finds the nearest centroid to a point and returns the distance to this centroid and label of it
+ *
+ * @param dataPoint The vector to determine the nearest centroid.
+ * @param centroids A collection of the centroids.
+ * @return A tuple of distance to the nearest centroid and label of this centroid
+ */
+ private def findNearestCentroid(dataPoint: Vector, centroids: Seq[LabeledVector]) = {
+ var minDistance: Double = Double.MaxValue
+ var closestCentroidLabel: Double = -1
+ centroids.foreach(centroid => {
+ val distance = euclidean.distance(dataPoint, centroid.vector)
+ if (distance < minDistance) {
+ minDistance = distance
+ closestCentroidLabel = centroid.label
+ }
+ })
+ (closestCentroidLabel, minDistance)
+ }
+
+ /**
+ * Returns the initial centroids for the KMeans algorithm based upon the information in
+ * parameter
+ *
+ * @param data The training data set
+ * @param parameter Parameter Map containing user parameters
+ * @return Initial centroids for KMeans clustering
+ */
+ private def init(data: DataSet[Vector], parameter: ParameterMap): DataSet[Seq[LabeledVector]] = {
+ parameter.get(InitialCentroids) match {
+ case Some(value) => data.getExecutionEnvironment.fromElements(value)
+ case None => {
+
+ val k = parameter.get(NumClusters) match{
+ case Some(value) => value
+ case None => throw new RuntimeException("Specify the number of clusters.")
+ }
+ val l = parameter.get(OversamplingFactor) match{
+ case Some(value) => value
+ case None => 2 * k // default value
+ }
+ val r = parameter.get(KMeansParRounds).get
+
+ val blocks = data.getParallelism
+
+ parameter.get(InitialStrategy) match {
+ case Some("random") => {
+ random(data.map(x => (x,1)), k)
+ }
+ case Some("kmeans++") => {
+ kmeans(data.map(x => (x,1)), k, blocks)
+ }
+ case Some("kmeans||") => {
+ parInit(data, k, blocks, l ,r)
+ }
+ case default => {
+ throw new RuntimeException("Specify a valid initialization strategy.")
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Pick k centers from data one by one using kmeans|| initialization scheme
+ *
+ * The k-means|| algorithm works as described by the original authors
+ * (http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf):
+ *
+ * Given a data set X with |X| points, the k-means|| algorithm proceeds as follows:
+ *
+ * 1. Initialize C \leftarrow \{\}
+ * 2. Let p be a point sampled uniformly at random from X. C \leftarrow C \cup \{p\}
+ * 3. for i \leftarrow 1 to r
+ * Let C' be the set of formed by independently sampling every point x in X with probability
+ * \ell\cdot\frac{d(x,C)}{sigma_nolimits{p \in X }d(p,C)}
+ * C \leftarrow C \cup C'
+ * 4. Assign weights to all point c in C as the number of points from X which are closest to c
+ * 5. Run kmeans++ initialization on the weighted set C and return k centers
+ *
+ * @param data Training data set
+ * @param k Number of clusters
+ * @param blocks Blocks in the data
+ * @param oversampling Oversampling rate (\ell)
+ * @param rounds Number of rounds (r)
+ * @return Initial centroids
+ */
+ private def parInit(
+ data: DataSet[Vector],
+ k: Int,
+ blocks: Int,
+ oversampling: Double,
+ rounds: Int)
+ : DataSet[Seq[LabeledVector]] = {
+ // first pick one center randomly
+ val oversamplingFactor = data.getExecutionEnvironment.fromElements(oversampling)
+
+ val initialCentroids = random(data.map(x => (x,1)), 1).map(x => x.head)
+ val unionOfSamples = initialCentroids.iterate(rounds){
+ currentSet => {
+ // current cost
+ val currentCost = data.mapWithBcSet(currentSet){
+ (vector, pointSet) => Math.pow(findNearestCentroid(vector, pointSet)._2, 2)
+ }
+ val sampledSet = data.filter(new RichFilterFunction[Vector] {
+ var currentSet: Seq[LabeledVector] = _
+ var cost: Double = _
+ var rng: Random = _
+ var oversamplingFactor: Double = _
+ override def open(parameter: Configuration): Unit ={
+ currentSet = getRuntimeContext.getBroadcastVariable(PARINIT_SET).asScala
+ cost = getRuntimeContext.getBroadcastVariable(PARINIT_COST).get(0)
+ oversamplingFactor = getRuntimeContext.getBroadcastVariable(PARINIT_SAMPLE).get(0)
+ rng = new Random()
+ }
+ override def filter(value: Vector): Boolean = {
+ rng.nextDouble() <
+ oversamplingFactor * Math.pow(findNearestCentroid(value, currentSet)._2, 2) / cost
+ }
+ }).withBroadcastSet(currentCost, PARINIT_COST)
+ .withBroadcastSet(currentSet, PARINIT_SET)
+ .withBroadcastSet(oversamplingFactor, PARINIT_SAMPLE)
+
+ // keep taking unions of independent samples at each step
+ currentSet.union(sampledSet.map(x => LabeledVector(0, x)))
+ }
+ }
+
+ // now assign weights to points in the set
+ val weightedSample = data.mapWithBcSet(unionOfSamples){
+ (vector, sampledSet) => {
+ val samples = sampledSet.toList
+ var minDistance: Double = Double.MaxValue
+ var closestCentroidIndex: Int = -1
+ for (i <- 0 to samples.size - 1) {
+ val distance = EuclideanDistanceMetric().distance(vector, samples(i).vector)
+ if (distance < minDistance) {
+ minDistance = distance
+ closestCentroidIndex = i
+ }
+ }
+ // just assign a label of 1. We'll figure this out later.
+ (closestCentroidIndex, samples(closestCentroidIndex).vector, 1)
+ }
+ }.groupBy(0)
+ .reduce((a, b) => (a._1, a._2, a._3 + b._3))
+ .map(x => (x._2,x._3))
+
+ // finally, do a kmeans++ on this weighted set
+ kmeans(weightedSample, k, blocks)
+ }
+
+ /**
+ * Randomly initializes centroids from the data.
+ * Data is considered to be weighted.
+ *
+ * @param data Training data set
+ * @param k Number of centroids to be picked
+ * @return Initial random centroids
+ */
+ private def random(
+ data: DataSet[(Vector, Int)],
+ k: Int)
+ : DataSet[Seq[LabeledVector]] = {
+ // we'll sample 10 times as many points as we actually need
+ // TODO Modify to use the Random Sample Operator as and when added.
+
+ val fraction = data.map(x => 1).reduce(_ + _).map(x => 10 * (k + 0.0) / x)
--- End diff --
How about k.toDouble or 10.0*k?
> Add Initialization schemes for K-means clustering
> -------------------------------------------------
>
> Key: FLINK-2131
> URL: https://issues.apache.org/jira/browse/FLINK-2131
> Project: Flink
> Issue Type: Task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
>
> The Lloyd's [KMeans] algorithm takes initial centroids as its input. However, in case the user doesn't provide the initial centers, they may ask for a particular initialization scheme to be followed. The most commonly used are these:
> 1. Random initialization: Self-explanatory
> 2. kmeans++ initialization: http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
> 3. kmeans|| : http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf
> For very large data sets, or for large values of k, the kmeans|| method is preferred as it provides the same approximation guarantees as kmeans++ and requires lesser number of passes over the input data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)