You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/06/12 03:16:40 UTC

git commit: [SPARK-1672][MLLIB] Separate user and product partitioning in ALS

Repository: spark
Updated Branches:
  refs/heads/master 9a2448daf -> d9203350b


[SPARK-1672][MLLIB] Separate user and product partitioning in ALS

Some clean up work following #593.

1. Allow to set different number user blocks and number product blocks in `ALS`.
2. Update `MovieLensALS` to reflect the change.

Author: Tor Myklebust <tm...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #1014 from mengxr/SPARK-1672 and squashes the following commits:

0e910dd [Xiangrui Meng] change private[this] to private[recommendation]
36420c7 [Xiangrui Meng] set exclusion rules for ALS
9128b77 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-1672
294efe9 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-1672
9bab77b [Xiangrui Meng] clean up add numUserBlocks and numProductBlocks to MovieLensALS
84c8e8c [Xiangrui Meng] Merge branch 'master' into SPARK-1672
d17a8bf [Xiangrui Meng] merge master
a4925fd [Tor Myklebust] Style.
bd8a75c [Tor Myklebust] Merge branch 'master' of github.com:apache/spark into alsseppar
021f54b [Tor Myklebust] Separate user and product blocks.
dcf583a [Tor Myklebust] Remove the partitioner member variable; instead, thread that needle everywhere it needs to go.
23d6f91 [Tor Myklebust] Stop making the partitioner configurable.
495784f [Tor Myklebust] Merge branch 'master' of https://github.com/apache/spark
674933a [Tor Myklebust] Fix style.
40edc23 [Tor Myklebust] Fix missing space.
f841345 [Tor Myklebust] Fix daft bug creating 'pairs', also for -> foreach.
5ec9e6c [Tor Myklebust] Clean a couple of things up using 'map'.
36a0f43 [Tor Myklebust] Make the partitioner private.
d872b09 [Tor Myklebust] Add negative id ALS test.
df27697 [Tor Myklebust] Support custom partitioners.  Currently we use the same partitioner for users and products.
c90b6d8 [Tor Myklebust] Scramble user and product ids before bucketing.
c774d7d [Tor Myklebust] Make the partitioner a member variable and use it instead of modding directly.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9203350
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9203350
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9203350

Branch: refs/heads/master
Commit: d9203350b06a9c737421ba244162b1365402c01b
Parents: 9a2448d
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Jun 11 18:16:33 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jun 11 18:16:33 2014 -0700

----------------------------------------------------------------------
 .../spark/examples/mllib/MovieLensALS.scala     |  12 +-
 .../apache/spark/mllib/recommendation/ALS.scala | 164 +++++++++++--------
 .../spark/mllib/recommendation/ALSSuite.scala   |  60 ++++---
 project/MimaExcludes.scala                      |   8 +
 4 files changed, 160 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9203350/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 6eb41e7..28e201d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -50,6 +50,8 @@ object MovieLensALS {
       numIterations: Int = 20,
       lambda: Double = 1.0,
       rank: Int = 10,
+      numUserBlocks: Int = -1,
+      numProductBlocks: Int = -1,
       implicitPrefs: Boolean = false)
 
   def main(args: Array[String]) {
@@ -67,8 +69,14 @@ object MovieLensALS {
         .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
         .action((x, c) => c.copy(lambda = x))
       opt[Unit]("kryo")
-        .text(s"use Kryo serialization")
+        .text("use Kryo serialization")
         .action((_, c) => c.copy(kryo = true))
+      opt[Int]("numUserBlocks")
+        .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
+        .action((x, c) => c.copy(numUserBlocks = x))
+      opt[Int]("numProductBlocks")
+        .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
+        .action((x, c) => c.copy(numProductBlocks = x))
       opt[Unit]("implicitPrefs")
         .text("use implicit preference")
         .action((_, c) => c.copy(implicitPrefs = true))
@@ -160,6 +168,8 @@ object MovieLensALS {
       .setIterations(params.numIterations)
       .setLambda(params.lambda)
       .setImplicitPrefs(params.implicitPrefs)
+      .setUserBlocks(params.numUserBlocks)
+      .setProductBlocks(params.numProductBlocks)
       .run(training)
 
     val rmse = computeRmse(model, test, params.implicitPrefs)

http://git-wip-us.apache.org/repos/asf/spark/blob/d9203350/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index d743bd7..f1ae7b8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -61,7 +61,7 @@ private[recommendation] case class InLinkBlock(
  * A more compact class to represent a rating than Tuple3[Int, Int, Double].
  */
 @Experimental
-case class Rating(val user: Int, val product: Int, val rating: Double)
+case class Rating(user: Int, product: Int, rating: Double)
 
 /**
  * Alternating Least Squares matrix factorization.
@@ -93,7 +93,8 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
  * preferences rather than explicit ratings given to items.
  */
 class ALS private (
-    private var numBlocks: Int,
+    private var numUserBlocks: Int,
+    private var numProductBlocks: Int,
     private var rank: Int,
     private var iterations: Int,
     private var lambda: Double,
@@ -106,14 +107,31 @@ class ALS private (
    * Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10,
    * lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
    */
-  def this() = this(-1, 10, 10, 0.01, false, 1.0)
+  def this() = this(-1, -1, 10, 10, 0.01, false, 1.0)
 
   /**
-   * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
-   * number of blocks. Default: -1.
+   * Set the number of blocks for both user blocks and product blocks to parallelize the computation
+   * into; pass -1 for an auto-configured number of blocks. Default: -1.
    */
   def setBlocks(numBlocks: Int): ALS = {
-    this.numBlocks = numBlocks
+    this.numUserBlocks = numBlocks
+    this.numProductBlocks = numBlocks
+    this
+  }
+
+  /**
+   * Set the number of user blocks to parallelize the computation.
+   */
+  def setUserBlocks(numUserBlocks: Int): ALS = {
+    this.numUserBlocks = numUserBlocks
+    this
+  }
+
+  /**
+   * Set the number of product blocks to parallelize the computation.
+   */
+  def setProductBlocks(numProductBlocks: Int): ALS = {
+    this.numProductBlocks = numProductBlocks
     this
   }
 
@@ -176,31 +194,32 @@ class ALS private (
   def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
     val sc = ratings.context
 
-    val numBlocks = if (this.numBlocks == -1) {
+    val numUserBlocks = if (this.numUserBlocks == -1) {
       math.max(sc.defaultParallelism, ratings.partitions.size / 2)
     } else {
-      this.numBlocks
+      this.numUserBlocks
     }
-
-    val partitioner = new Partitioner {
-      val numPartitions = numBlocks
-
-      def getPartition(x: Any): Int = {
-        Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions)
-      }
+    val numProductBlocks = if (this.numProductBlocks == -1) {
+      math.max(sc.defaultParallelism, ratings.partitions.size / 2)
+    } else {
+      this.numProductBlocks
     }
 
-    val ratingsByUserBlock = ratings.map{ rating =>
-      (partitioner.getPartition(rating.user), rating)
+    val userPartitioner = new ALSPartitioner(numUserBlocks)
+    val productPartitioner = new ALSPartitioner(numProductBlocks)
+
+    val ratingsByUserBlock = ratings.map { rating =>
+      (userPartitioner.getPartition(rating.user), rating)
     }
-    val ratingsByProductBlock = ratings.map{ rating =>
-      (partitioner.getPartition(rating.product),
+    val ratingsByProductBlock = ratings.map { rating =>
+      (productPartitioner.getPartition(rating.product),
         Rating(rating.product, rating.user, rating.rating))
     }
 
-    val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
+    val (userInLinks, userOutLinks) =
+      makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, productPartitioner)
     val (productInLinks, productOutLinks) =
-        makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
+      makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, userPartitioner)
     userInLinks.setName("userInLinks")
     userOutLinks.setName("userOutLinks")
     productInLinks.setName("productInLinks")
@@ -232,27 +251,27 @@ class ALS private (
         users.setName(s"users-$iter").persist()
         val YtY = Some(sc.broadcast(computeYtY(users)))
         val previousProducts = products
-        products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
-          alpha, YtY)
+        products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
+          userPartitioner, rank, lambda, alpha, YtY)
         previousProducts.unpersist()
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
         products.setName(s"products-$iter").persist()
         val XtX = Some(sc.broadcast(computeYtY(products)))
         val previousUsers = users
-        users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
-          alpha, XtX)
+        users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
+          productPartitioner, rank, lambda, alpha, XtX)
         previousUsers.unpersist()
       }
     } else {
       for (iter <- 1 to iterations) {
         // perform ALS update
         logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
-        products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
-          alpha, YtY = None)
+        products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
+          userPartitioner, rank, lambda, alpha, YtY = None)
         products.setName(s"products-$iter")
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
-        users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
-          alpha, YtY = None)
+        users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
+          productPartitioner, rank, lambda, alpha, YtY = None)
         users.setName(s"users-$iter")
       }
     }
@@ -340,9 +359,10 @@ class ALS private (
   /**
    * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
    */
-  private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
-                     outLinks: RDD[(Int, OutLinkBlock)]) = {
-    blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
+  private def unblockFactors(
+      blockedFactors: RDD[(Int, Array[Array[Double]])],
+      outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = {
+    blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) =>
       for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
     }
   }
@@ -351,14 +371,14 @@ class ALS private (
    * Make the out-links table for a block of the users (or products) dataset given the list of
    * (user, product, rating) values for the users in that block (or the opposite for products).
    */
-  private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating],
-      partitioner: Partitioner): OutLinkBlock = {
+  private def makeOutLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
+      productPartitioner: Partitioner): OutLinkBlock = {
     val userIds = ratings.map(_.user).distinct.sorted
     val numUsers = userIds.length
     val userIdToPos = userIds.zipWithIndex.toMap
-    val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
+    val shouldSend = Array.fill(numUsers)(new BitSet(numProductBlocks))
     for (r <- ratings) {
-      shouldSend(userIdToPos(r.user))(partitioner.getPartition(r.product)) = true
+      shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true
     }
     OutLinkBlock(userIds, shouldSend)
   }
@@ -367,18 +387,17 @@ class ALS private (
    * Make the in-links table for a block of the users (or products) dataset given a list of
    * (user, product, rating) values for the users in that block (or the opposite for products).
    */
-  private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating],
-      partitioner: Partitioner): InLinkBlock = {
+  private def makeInLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
+      productPartitioner: Partitioner): InLinkBlock = {
     val userIds = ratings.map(_.user).distinct.sorted
-    val numUsers = userIds.length
     val userIdToPos = userIds.zipWithIndex.toMap
     // Split out our ratings by product block
-    val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating])
+    val blockRatings = Array.fill(numProductBlocks)(new ArrayBuffer[Rating])
     for (r <- ratings) {
-      blockRatings(partitioner.getPartition(r.product)) += r
+      blockRatings(productPartitioner.getPartition(r.product)) += r
     }
-    val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
-    for (productBlock <- 0 until numBlocks) {
+    val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numProductBlocks)
+    for (productBlock <- 0 until numProductBlocks) {
       // Create an array of (product, Seq(Rating)) ratings
       val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
       // Sort them by product ID
@@ -400,14 +419,16 @@ class ALS private (
    * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
    * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
    */
-  private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)], partitioner: Partitioner)
-    : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
-  {
-    val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
+  private def makeLinkRDDs(
+      numUserBlocks: Int,
+      numProductBlocks: Int,
+      ratingsByUserBlock: RDD[(Int, Rating)],
+      productPartitioner: Partitioner): (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = {
+    val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks))
     val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
-      val ratings = elements.map{_._2}.toArray
-      val inLinkBlock = makeInLinkBlock(numBlocks, ratings, partitioner)
-      val outLinkBlock = makeOutLinkBlock(numBlocks, ratings, partitioner)
+      val ratings = elements.map(_._2).toArray
+      val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner)
+      val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner)
       Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
     }, true)
     val inLinks = links.mapValues(_._1)
@@ -439,26 +460,24 @@ class ALS private (
    * It returns an RDD of new feature vectors for each user block.
    */
   private def updateFeatures(
+      numUserBlocks: Int,
       products: RDD[(Int, Array[Array[Double]])],
       productOutLinks: RDD[(Int, OutLinkBlock)],
       userInLinks: RDD[(Int, InLinkBlock)],
-      partitioner: Partitioner,
+      productPartitioner: Partitioner,
       rank: Int,
       lambda: Double,
       alpha: Double,
-      YtY: Option[Broadcast[DoubleMatrix]])
-    : RDD[(Int, Array[Array[Double]])] =
-  {
-    val numBlocks = products.partitions.size
+      YtY: Option[Broadcast[DoubleMatrix]]): RDD[(Int, Array[Array[Double]])] = {
     productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) =>
-        val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
-        for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) {
+        val toSend = Array.fill(numUserBlocks)(new ArrayBuffer[Array[Double]])
+        for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numUserBlocks) {
           if (outLinkBlock.shouldSend(p)(userBlock)) {
             toSend(userBlock) += factors(p)
           }
         }
         toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
-    }.groupByKey(partitioner)
+    }.groupByKey(productPartitioner)
      .join(userInLinks)
      .mapValues{ case (messages, inLinkBlock) =>
         updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
@@ -475,7 +494,7 @@ class ALS private (
   {
     // Sort the incoming block factor messages by block ID and make them an array
     val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
-    val numBlocks = blockFactors.length
+    val numProductBlocks = blockFactors.length
     val numUsers = inLinkBlock.elementIds.length
 
     // We'll sum up the XtXes using vectors that represent only the lower-triangular part, since
@@ -490,7 +509,7 @@ class ALS private (
 
     // Compute the XtX and Xy values for each user by adding products it rated in each product
     // block
-    for (productBlock <- 0 until numBlocks) {
+    for (productBlock <- 0 until numProductBlocks) {
       var p = 0
       while (p < blockFactors(productBlock).length) {
         val x = wrapDoubleArray(blockFactors(productBlock)(p))
@@ -579,6 +598,23 @@ class ALS private (
   }
 }
 
+/**
+ * Partitioner for ALS.
+ */
+private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner {
+  override def getPartition(key: Any): Int = {
+    Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case p: ALSPartitioner =>
+        this.numPartitions == p.numPartitions
+      case _ =>
+        false
+    }
+  }
+}
 
 /**
  * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
@@ -606,7 +642,7 @@ object ALS {
       blocks: Int,
       seed: Long
     ): MatrixFactorizationModel = {
-    new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
+    new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
   }
 
   /**
@@ -629,7 +665,7 @@ object ALS {
       lambda: Double,
       blocks: Int
     ): MatrixFactorizationModel = {
-    new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
+    new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings)
   }
 
   /**
@@ -689,7 +725,7 @@ object ALS {
       alpha: Double,
       seed: Long
     ): MatrixFactorizationModel = {
-    new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
+    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
   }
 
   /**
@@ -714,7 +750,7 @@ object ALS {
       blocks: Int,
       alpha: Double
     ): MatrixFactorizationModel = {
-    new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
+    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9203350/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 37c9b9d..81bebec 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -121,6 +121,10 @@ class ALSSuite extends FunSuite with LocalSparkContext {
     testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true)
   }
 
+  test("rank-2 matrices with different user and product blocks") {
+    testALS(100, 200, 2, 15, 0.7, 0.4, numUserBlocks = 4, numProductBlocks = 2)
+  }
+
   test("pseudorandomness") {
     val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
     val model11 = ALS.train(ratings, 5, 1, 1.0, 2, 1)
@@ -153,35 +157,52 @@ class ALSSuite extends FunSuite with LocalSparkContext {
   }
 
   test("NNALS, rank 2") {
-    testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false)
+    testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false)
   }
 
   /**
    * Test if we can correctly factorize R = U * P where U and P are of known rank.
    *
-   * @param users          number of users
-   * @param products       number of products
-   * @param features       number of features (rank of problem)
-   * @param iterations     number of iterations to run
-   * @param samplingRate   what fraction of the user-product pairs are known
+   * @param users number of users
+   * @param products number of products
+   * @param features number of features (rank of problem)
+   * @param iterations number of iterations to run
+   * @param samplingRate what fraction of the user-product pairs are known
    * @param matchThreshold max difference allowed to consider a predicted rating correct
-   * @param implicitPrefs  flag to test implicit feedback
-   * @param bulkPredict    flag to test bulk prediciton
+   * @param implicitPrefs flag to test implicit feedback
+   * @param bulkPredict flag to test bulk prediciton
    * @param negativeWeights whether the generated data can contain negative values
-   * @param numBlocks      number of blocks to partition users and products into
+   * @param numUserBlocks number of user blocks to partition users into
+   * @param numProductBlocks number of product blocks to partition products into
    * @param negativeFactors whether the generated user/product factors can have negative entries
    */
-  def testALS(users: Int, products: Int, features: Int, iterations: Int,
-    samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
-    bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
-    negativeFactors: Boolean = true)
-  {
+  def testALS(
+      users: Int,
+      products: Int,
+      features: Int,
+      iterations: Int,
+      samplingRate: Double,
+      matchThreshold: Double,
+      implicitPrefs: Boolean = false,
+      bulkPredict: Boolean = false,
+      negativeWeights: Boolean = false,
+      numUserBlocks: Int = -1,
+      numProductBlocks: Int = -1,
+      negativeFactors: Boolean = true) {
     val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
       features, samplingRate, implicitPrefs, negativeWeights, negativeFactors)
 
-    val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations)
-          .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L)
-          .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings)))
+    val model = new ALS()
+      .setUserBlocks(numUserBlocks)
+      .setProductBlocks(numProductBlocks)
+      .setRank(features)
+      .setIterations(iterations)
+      .setAlpha(1.0)
+      .setImplicitPrefs(implicitPrefs)
+      .setLambda(0.01)
+      .setSeed(0L)
+      .setNonnegative(!negativeFactors)
+      .run(sc.parallelize(sampledRatings))
 
     val predictedU = new DoubleMatrix(users, features)
     for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
@@ -208,8 +229,9 @@ class ALSSuite extends FunSuite with LocalSparkContext {
         val prediction = predictedRatings.get(u, p)
         val correct = trueRatings.get(u, p)
         if (math.abs(prediction - correct) > matchThreshold) {
-          fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
-            u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
+          fail(("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s")
+            .format(u, p, correct, prediction, trueRatings, predictedRatings, predictedU,
+              predictedP))
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9203350/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dd7efce..c80ab9a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,14 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
           ) ++
+          Seq( // Ignore some private methods in ALS.
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+            ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
+              "org.apache.spark.mllib.recommendation.ALS.this"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
+          ) ++
           MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
           MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
           MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")