You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/05 01:29:46 UTC

[8/9] git commit: Aggregated all sample points to driver without any shuffle

Aggregated all sample points to driver without any shuffle


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dd6033e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dd6033e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dd6033e6

Branch: refs/heads/master
Commit: dd6033e6853e32e9de2c910797c7fbc0072e7491
Parents: 6d0e2e8
Author: Lian, Cheng <rh...@gmail.com>
Authored: Thu Jan 2 01:38:24 2014 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Thu Jan 2 01:38:24 2014 +0800

----------------------------------------------------------------------
 .../spark/mllib/classification/NaiveBayes.scala | 76 +++++++-------------
 .../mllib/classification/NaiveBayesSuite.scala  |  8 +--
 2 files changed, 31 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dd6033e6/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 9fd1add..524300d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.mllib.classification
 
+import scala.collection.mutable
+
 import org.jblas.DoubleMatrix
 
 import org.apache.spark.Logging
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
 
 /**
  * Model for Naive Bayes Classifiers.
@@ -60,62 +61,39 @@ class NaiveBayes private (val lambda: Double = 1.0)
    * @param data RDD of (label, array of features) pairs.
    */
   def run(data: RDD[LabeledPoint]) = {
-    // Prepares input data, the shape of resulted RDD is:
-    //
-    //    label: Int -> (count: Int, features: DoubleMatrix)
-    //
-    // The added count field is initialized to 1 to enable the following `foldByKey` transformation.
-    val mappedData = data.map { case LabeledPoint(label, features) =>
-      label.toInt -> (1, new DoubleMatrix(features.length, 1, features: _*))
-    }
-
-    // Gets a map from labels to their corresponding sample point counts and summed feature vectors.
-    // Shape of resulted RDD is:
-    //
-    //    label: Int -> (count: Int, summedFeatureVector: DoubleMatrix)
+    // Aggregates all sample points to driver side to get sample count and summed feature vector
+    // for each label.  The shape of `zeroCombiner` & `aggregated` is:
     //
-    // Two tricky parts worth explaining:
-    //
-    // 1. Feature vectors are summed with the inplace jblas matrix addition operation, thus we
-    //    chose `foldByKey` instead of `reduceByKey` to avoid modifying original input data.
-    //
-    // 2. The zero value passed to `foldByKey` contains a `null` rather than a zero vector because
-    //    the dimension of the feature vector is unknown.  Calling `data.first.length` to get the
-    //    dimension is not preferable since it requires an expensive RDD action.
-    val countsAndSummedFeatures = mappedData.foldByKey((0, null)) { (lhs, rhs) =>
-      if (lhs._1 == 0) {
-        (rhs._1, new DoubleMatrix().copy(rhs._2))
-      } else {
-        (lhs._1 + rhs._1, lhs._2.addi(rhs._2))
+    //    label: Int -> (count: Int, featuresSum: DoubleMatrix)
+    val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
+    val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
+      point match {
+        case LabeledPoint(label, features) =>
+          val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
+          val fs = new DoubleMatrix(features.length, 1, features: _*)
+          combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
       }
-    }
-
-    val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) =>
-      val p = math.log(count + lambda)
-      val logDenom = math.log(summedFeatureVector.sum + summedFeatureVector.length * lambda)
-      val t = summedFeatureVector
-      var i = 0
-      while (i < t.length) {
-        t.put(i, math.log(t.get(i) + lambda) - logDenom)
-        i += 1
+    }, { (lhs, rhs) =>
+      for ((label, (c, fs)) <- rhs) {
+        val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
+        lhs(label) = (count + c, featuresSum.addi(fs))
       }
-      (count, p, t)
-    }.collectAsMap()
-
-    // Total sample count.  Calling `data.count` to get `N` is not preferable since it triggers
-    // an expensive RDD action
-    val N = collected.values.map(_._1).sum
+      lhs
+    })
 
-    // Kinds of label.
-    val C = collected.size
+    // Kinds of label
+    val C = aggregated.size
+    // Total sample count
+    val N = aggregated.values.map(_._1).sum
 
-    val logDenom = math.log(N + C * lambda)
     val pi = new Array[Double](C)
     val theta = new Array[Array[Double]](C)
+    val piLogDenom = math.log(N + C * lambda)
 
-    for ((label, (_, p, t)) <- collected) {
-      pi(label) = p - logDenom
-      theta(label) = t.toArray
+    for ((label, (count, fs)) <- aggregated) {
+      val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
+      pi(label) = math.log(count + lambda) - piLogDenom
+      theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
     }
 
     new NaiveBayesModel(pi, theta)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dd6033e6/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 18575f4..b615f76 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -27,16 +27,16 @@ import org.apache.spark.SparkContext
 
 object NaiveBayesSuite {
 
-  private def calcLabel(p: Double, weightPerLabel: Array[Double]): Int = {
+  private def calcLabel(p: Double, pi: Array[Double]): Int = {
     var sum = 0.0
-    for (j <- 0 until weightPerLabel.length) {
-      sum += weightPerLabel(j)
+    for (j <- 0 until pi.length) {
+      sum += pi(j)
       if (p < sum) return j
     }
     -1
   }
 
-  // Generate input of the form Y = (weightMatrix*x).argmax()
+  // Generate input of the form Y = (theta * x).argmax()
   def generateNaiveBayesInput(
       pi: Array[Double],            // 1XC
       theta: Array[Array[Double]],  // CXD