You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/05/01 17:27:48 UTC

spark git commit: [SPARK-3066] [MLLIB] Support recommendAll in matrix factorization model

Repository: spark
Updated Branches:
  refs/heads/master 3052f4916 -> 3b514af8a


[SPARK-3066] [MLLIB] Support recommendAll in matrix factorization model

This is based on #3098 from debasish83.

1. BLAS' GEMM is used to compute inner products.
2. Reverted changes to MovieLensALS. SPARK-4231 should be addressed in a separate PR.
3. ~~Fixed a bug in topByKey~~

Closes #3098

debasish83 coderxiang

Author: Debasish Das <de...@one.verizon.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #5829 from mengxr/SPARK-3066 and squashes the following commits:

22e6a87 [Xiangrui Meng] topByKey was correct. update its usage
389b381 [Xiangrui Meng] fix indentation
49953de [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3066
cb9799a [Xiangrui Meng] revert MovieLensALS
f864f5e [Xiangrui Meng] update test and fix a bug in topByKey
c5e0181 [Xiangrui Meng] use GEMM and topByKey
3a0c4eb [Debasish Das] updated with spark master
98fa424 [Debasish Das] updated with master
ee99571 [Debasish Das] addressed initial review comments;merged with master;added tests for batch predict APIs in matrix factorization
3f97c49 [Debasish Das] fixed spark coding style for imports
7163a5c [Debasish Das] Added API for batch user and product recommendation; MAP calculation for product recommendation per user using randomized split
d144f57 [Debasish Das] recommendAll API to MatrixFactorizationModel, uses topK finding using BoundedPriorityQueue similar to RDD.top
f38a1b5 [Debasish Das] use sampleByKey for per user sampling
10cbb37 [Debasish Das] provide ratio for topN product validation; generate MAP and prec@k metric for movielens dataset
9fa063e [Debasish Das] import scala.math.round
4bbae0f [Debasish Das] comments fixed as per scalastyle
cd3ab31 [Debasish Das] merged with AbstractParams serialization bug
9b3951f [Debasish Das] validate user/product on MovieLens dataset through user input and compute map measure along with rmse


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b514af8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b514af8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b514af8

Branch: refs/heads/master
Commit: 3b514af8a0c2ca496315b99a2b09bc887ac6c5e1
Parents: 3052f49
Author: Debasish Das <de...@one.verizon.com>
Authored: Fri May 1 08:27:46 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Fri May 1 08:27:46 2015 -0700

----------------------------------------------------------------------
 .../spark/mllib/rdd/MLPairRDDFunctions.scala    |   4 +-
 .../MatrixFactorizationModel.scala              | 132 ++++++++++++++++---
 .../mllib/rdd/MLPairRDDFunctionsSuite.scala     |   4 +-
 .../MatrixFactorizationModelSuite.scala         |  20 +++
 4 files changed, 138 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b514af8/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
index 9213fd3..5af55aa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
@@ -42,13 +42,11 @@ class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Se
     self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
       seqOp = (queue, item) => {
         queue += item
-        queue
       },
       combOp = (queue1, queue2) => {
         queue1 ++= queue2
-        queue1
       }
-    ).mapValues(_.toArray.sorted(ord.reverse))
+    ).mapValues(_.toArray.reverse)  // This is an min-heap, so we reverse the order.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3b514af8/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 36cbf06..88c2148 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -20,14 +20,18 @@ package org.apache.spark.mllib.recommendation
 import java.io.IOException
 import java.lang.{Integer => JavaInteger}
 
+import scala.collection.mutable
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.hadoop.fs.Path
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
-import com.github.fommil.netlib.BLAS.{getInstance => blas}
 
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SQLContext}
@@ -57,7 +61,7 @@ class MatrixFactorizationModel(
 
   /** Validates factors and warns users if there are performance concerns. */
   private def validateFeatures(name: String, features: RDD[(Int, Array[Double])]): Unit = {
-    require(features.first()._2.size == rank,
+    require(features.first()._2.length == rank,
       s"$name feature dimension does not match the rank $rank.")
     if (features.partitioner.isEmpty) {
       logWarning(s"$name factor does not have a partitioner. "
@@ -72,19 +76,19 @@ class MatrixFactorizationModel(
   def predict(user: Int, product: Int): Double = {
     val userVector = userFeatures.lookup(user).head
     val productVector = productFeatures.lookup(product).head
-    blas.ddot(userVector.length, userVector, 1, productVector, 1)
+    blas.ddot(rank, userVector, 1, productVector, 1)
   }
 
   /**
-    * Predict the rating of many users for many products.
-    * The output RDD has an element per each element in the input RDD (including all duplicates)
-    * unless a user or product is missing in the training set.
-    *
-    * @param usersProducts  RDD of (user, product) pairs.
-    * @return RDD of Ratings.
-    */
+   * Predict the rating of many users for many products.
+   * The output RDD has an element per each element in the input RDD (including all duplicates)
+   * unless a user or product is missing in the training set.
+   *
+   * @param usersProducts  RDD of (user, product) pairs.
+   * @return RDD of Ratings.
+   */
   def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
-    val users = userFeatures.join(usersProducts).map{
+    val users = userFeatures.join(usersProducts).map {
       case (user, (uFeatures, product)) => (product, (user, uFeatures))
     }
     users.join(productFeatures).map {
@@ -112,7 +116,7 @@ class MatrixFactorizationModel(
    *  recommended the product is.
    */
   def recommendProducts(user: Int, num: Int): Array[Rating] =
-    recommend(userFeatures.lookup(user).head, productFeatures, num)
+    MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num)
       .map(t => Rating(user, t._1, t._2))
 
   /**
@@ -128,7 +132,7 @@ class MatrixFactorizationModel(
    *  recommended the user is.
    */
   def recommendUsers(product: Int, num: Int): Array[Rating] =
-    recommend(productFeatures.lookup(product).head, userFeatures, num)
+    MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num)
       .map(t => Rating(t._1, product, t._2))
 
   protected override val formatVersion: String = "1.0"
@@ -137,20 +141,113 @@ class MatrixFactorizationModel(
     MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
   }
 
+  /**
+   * Recommends topK products for all users.
+   *
+   * @param num how many products to return for every user.
+   * @return [(Int, Array[Rating])] objects, where every tuple contains a userID and an array of
+   * rating objects which contains the same userId, recommended productID and a "score" in the
+   * rating field. Semantics of score is same as recommendProducts API
+   */
+  def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = {
+    MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map {
+      case (user, top) =>
+        val ratings = top.map { case (product, rating) => Rating(user, product, rating) }
+        (user, ratings)
+    }
+  }
+
+
+  /**
+   * Recommends topK users for all products.
+   *
+   * @param num how many users to return for every product.
+   * @return [(Int, Array[Rating])] objects, where every tuple contains a productID and an array
+   * of rating objects which contains the recommended userId, same productID and a "score" in the
+   * rating field. Semantics of score is same as recommendUsers API
+   */
+  def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = {
+    MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map {
+      case (product, top) =>
+        val ratings = top.map { case (user, rating) => Rating(user, product, rating) }
+        (product, ratings)
+    }
+  }
+}
+
+object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
+
+  import org.apache.spark.mllib.util.Loader._
+
+  /**
+   * Makes recommendations for a single user (or product).
+   */
   private def recommend(
       recommendToFeatures: Array[Double],
       recommendableFeatures: RDD[(Int, Array[Double])],
       num: Int): Array[(Int, Double)] = {
-    val scored = recommendableFeatures.map { case (id,features) =>
+    val scored = recommendableFeatures.map { case (id, features) =>
       (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
     }
     scored.top(num)(Ordering.by(_._2))
   }
-}
 
-object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
+  /**
+   * Makes recommendations for all users (or products).
+   * @param rank rank
+   * @param srcFeatures src features to receive recommendations
+   * @param dstFeatures dst features used to make recommendations
+   * @param num number of recommendations for each record
+   * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array
+   *         of (dstId, rating) pairs.
+   */
+  private def recommendForAll(
+      rank: Int,
+      srcFeatures: RDD[(Int, Array[Double])],
+      dstFeatures: RDD[(Int, Array[Double])],
+      num: Int): RDD[(Int, Array[(Int, Double)])] = {
+    val srcBlocks = blockify(rank, srcFeatures)
+    val dstBlocks = blockify(rank, dstFeatures)
+    val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
+      case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
+        val m = srcIds.length
+        val n = dstIds.length
+        val ratings = srcFactors.transpose.multiply(dstFactors)
+        val output = new Array[(Int, (Int, Double))](m * n)
+        var k = 0
+        ratings.foreachActive { (i, j, r) =>
+          output(k) = (srcIds(i), (dstIds(j), r))
+          k += 1
+        }
+        output.toSeq
+    }
+    ratings.topByKey(num)(Ordering.by(_._2))
+  }
 
-  import org.apache.spark.mllib.util.Loader._
+  /**
+   * Blockifies features to use Level-3 BLAS.
+   */
+  private def blockify(
+      rank: Int,
+      features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
+    val blockSize = 4096 // TODO: tune the block size
+    val blockStorage = rank * blockSize
+    features.mapPartitions { iter =>
+      iter.grouped(blockSize).map { grouped =>
+        val ids = mutable.ArrayBuilder.make[Int]
+        ids.sizeHint(blockSize)
+        val factors = mutable.ArrayBuilder.make[Double]
+        factors.sizeHint(blockStorage)
+        var i = 0
+        grouped.foreach { case (id, factor) =>
+          ids += id
+          factors ++= factor
+          i += 1
+        }
+        (ids.result(), new DenseMatrix(rank, i, factors.result()))
+      }
+    }
+  }
 
   override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
     val (loadedClassName, formatVersion, _) = loadMetadata(sc, path)
@@ -214,4 +311,5 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
       new Path(dataPath(path), "product").toUri.toString
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b514af8/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
index 1ac7c12..cb8fe4d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
@@ -24,13 +24,13 @@ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
 
 class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
   test("topByKey") {
-    val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2)
+    val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (5, 1), (3, 5)), 2)
       .topByKey(2)
       .collectAsMap()
 
     assert(topMap.size === 3)
     assert(topMap(1) === Array(2, 1))
     assert(topMap(3) === Array(7, 5))
-    assert(topMap(5) === Array(3, 1))
+    assert(topMap(5) === Array(1))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b514af8/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
index 9801e87..2c92866 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
@@ -72,4 +72,24 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
       Utils.deleteRecursively(tempDir)
     }
   }
+
+  test("batch predict API recommendProductsForUsers") {
+    val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
+    val topK = 10
+    val recommendations = model.recommendProductsForUsers(topK).collectAsMap()
+
+    assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14)
+    assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14)
+  }
+
+  test("batch predict API recommendUsersForProducts") {
+    val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
+    val topK = 10
+    val recommendations = model.recommendUsersForProducts(topK).collectAsMap()
+
+    assert(recommendations(2)(0).user == 1)
+    assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14)
+    assert(recommendations(2)(1).user == 0)
+    assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org