You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/07/09 01:32:03 UTC

spark git commit: [SPARK-5016] [MLLIB] Distribute GMM mixture components to executors

Repository: spark
Updated Branches:
  refs/heads/master 8c32b2e87 -> f472b8cdc


[SPARK-5016] [MLLIB] Distribute GMM mixture components to executors

Distribute expensive portions of computation for Gaussian mixture components (in particular, pre-computation of `MultivariateGaussian.rootSigmaInv`, the inverse covariance matrix and covariance determinant) across executors. Repost of PR#4654.

Notes for reviewers:
 * What should be the policy for when to distribute computation. Always? When numClusters > threshold? User-specified param?

TODO:
 * Performance testing and comparison for large number of clusters

Author: Feynman Liang <fl...@databricks.com>

Closes #7166 from feynmanliang/GMM_parallel_mixtures and squashes the following commits:

4f351fa [Feynman Liang] Update heuristic and scaladoc
5ea947e [Feynman Liang] Fix parallelization logic
00eb7db [Feynman Liang] Add helper method for GMM's M step, remove distributeGaussians flag
e7c8127 [Feynman Liang] Add distributeGaussians flag and tests
1da3c7f [Feynman Liang] Distribute mixtures


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f472b8cd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f472b8cd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f472b8cd

Branch: refs/heads/master
Commit: f472b8cdc00839780dc79be0bbe53a098cde230c
Parents: 8c32b2e
Author: Feynman Liang <fl...@databricks.com>
Authored: Wed Jul 8 16:32:00 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jul 8 16:32:00 2015 -0700

----------------------------------------------------------------------
 .../mllib/clustering/GaussianMixture.scala      | 44 ++++++++++++++++----
 1 file changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f472b8cd/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index fc509d2..e459367 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -140,6 +140,10 @@ class GaussianMixture private (
     // Get length of the input vectors
     val d = breezeData.first().length
 
+    // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
+    // d > 25 except for when k is very small
+    val distributeGaussians = ((k - 1.0) / k) * d > 25
+
     // Determine initial weights and corresponding Gaussians.
     // If the user supplied an initial GMM, we use those values, otherwise
     // we start with uniform weights, a random mean from the data, and
@@ -171,14 +175,25 @@ class GaussianMixture private (
       // Create new distributions based on the partial assignments
       // (often referred to as the "M" step in literature)
       val sumWeights = sums.weights.sum
-      var i = 0
-      while (i < k) {
-        val mu = sums.means(i) / sums.weights(i)
-        BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu),
-          Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
-        weights(i) = sums.weights(i) / sumWeights
-        gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
-        i = i + 1
+
+      if (distributeGaussians) {
+        val numPartitions = math.min(k, 1024)
+        val tuples =
+          Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
+        val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
+          updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
+        }.collect.unzip
+        Array.copy(ws, 0, weights, 0, ws.length)
+        Array.copy(gs, 0, gaussians, 0, gs.length)
+      } else {
+        var i = 0
+        while (i < k) {
+          val (weight, gaussian) =
+            updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
+          weights(i) = weight
+          gaussians(i) = gaussian
+          i = i + 1
+        }
       }
 
       llhp = llh // current becomes previous
@@ -192,6 +207,19 @@ class GaussianMixture private (
   /** Java-friendly version of [[run()]] */
   def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)
 
+  private def updateWeightsAndGaussians(
+      mean: BDV[Double],
+      sigma: BreezeMatrix[Double],
+      weight: Double,
+      sumWeights: Double): (Double, MultivariateGaussian) = {
+    val mu = (mean /= weight)
+    BLAS.syr(-weight, Vectors.fromBreeze(mu),
+      Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
+    val newWeight = weight / sumWeights
+    val newGaussian = new MultivariateGaussian(mu, sigma / weight)
+    (newWeight, newGaussian)
+  }
+
   /** Average of dense breeze vectors */
   private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
     val v = BDV.zeros[Double](x(0).length)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org