You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/07/02 19:18:26 UTC

spark git commit: [SPARK-8708] [MLLIB] Paritition ALS ratings based on both users and products

Repository: spark
Updated Branches:
  refs/heads/master 52302a803 -> 0e553a3e9


[SPARK-8708] [MLLIB] Paritition ALS ratings based on both users and products

JIRA: https://issues.apache.org/jira/browse/SPARK-8708

Previously the partitions of ratings are only based on the given products. So if the `usersProducts` given for prediction contains only few products or even one product, the generated ratings will be pushed into few or single partition and can't use high parallelism.

The following codes are the example reported in the JIRA. Because it asks the predictions for users on product 2. There is only one partition in the result.

    >>> r1 = (1, 1, 1.0)
    >>> r2 = (1, 2, 2.0)
    >>> r3 = (2, 1, 2.0)
    >>> r4 = (2, 2, 2.0)
    >>> r5 = (3, 1, 1.0)
    >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5)
    >>> users = ratings.map(itemgetter(0)).distinct()
    >>> model = ALS.trainImplicit(ratings, 1, seed=10)
    >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2)))
    >>> predictions_for_2.glom().map(len).collect()
    [0, 0, 3, 0, 0]

This PR uses user and product instead of only product to partition the ratings.

Author: Liang-Chi Hsieh <vi...@gmail.com>
Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #7121 from viirya/mfm_fix_partition and squashes the following commits:

779946d [Liang-Chi Hsieh] Calculate approximate numbers of users and products in one pass.
4336dc2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into mfm_fix_partition
83e56c1 [Liang-Chi Hsieh] Instead of additional join, use the numbers of users and products to decide how to perform join.
b534dc8 [Liang-Chi Hsieh] Paritition ratings based on both users and products.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e553a3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e553a3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e553a3e

Branch: refs/heads/master
Commit: 0e553a3e9360a736920e2214d634373fef0dbcf7
Parents: 52302a8
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Jul 2 10:18:23 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Jul 2 10:18:23 2015 -0700

----------------------------------------------------------------------
 .../MatrixFactorizationModel.scala              | 55 +++++++++++++++++---
 1 file changed, 49 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e553a3e/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 93aa41e..43d219a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -22,6 +22,7 @@ import java.lang.{Integer => JavaInteger}
 
 import scala.collection.mutable
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.hadoop.fs.Path
 import org.json4s._
@@ -80,6 +81,30 @@ class MatrixFactorizationModel(
   }
 
   /**
+   * Return approximate numbers of users and products in the given usersProducts tuples.
+   * This method is based on `countApproxDistinct` in class `RDD`.
+   *
+   * @param usersProducts  RDD of (user, product) pairs.
+   * @return approximate numbers of users and products.
+   */
+  private[this] def countApproxDistinctUserProduct(usersProducts: RDD[(Int, Int)]): (Long, Long) = {
+    val zeroCounterUser = new HyperLogLogPlus(4, 0)
+    val zeroCounterProduct = new HyperLogLogPlus(4, 0)
+    val aggregated = usersProducts.aggregate((zeroCounterUser, zeroCounterProduct))(
+      (hllTuple: (HyperLogLogPlus, HyperLogLogPlus), v: (Int, Int)) => {
+        hllTuple._1.offer(v._1)
+        hllTuple._2.offer(v._2)
+        hllTuple
+      },
+      (h1: (HyperLogLogPlus, HyperLogLogPlus), h2: (HyperLogLogPlus, HyperLogLogPlus)) => {
+        h1._1.addAll(h2._1)
+        h1._2.addAll(h2._2)
+        h1
+      })
+    (aggregated._1.cardinality(), aggregated._2.cardinality())
+  }
+
+  /**
    * Predict the rating of many users for many products.
    * The output RDD has an element per each element in the input RDD (including all duplicates)
    * unless a user or product is missing in the training set.
@@ -88,12 +113,30 @@ class MatrixFactorizationModel(
    * @return RDD of Ratings.
    */
   def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
-    val users = userFeatures.join(usersProducts).map {
-      case (user, (uFeatures, product)) => (product, (user, uFeatures))
-    }
-    users.join(productFeatures).map {
-      case (product, ((user, uFeatures), pFeatures)) =>
-        Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+    // Previously the partitions of ratings are only based on the given products.
+    // So if the usersProducts given for prediction contains only few products or
+    // even one product, the generated ratings will be pushed into few or single partition
+    // and can't use high parallelism.
+    // Here we calculate approximate numbers of users and products. Then we decide the
+    // partitions should be based on users or products.
+    val (usersCount, productsCount) = countApproxDistinctUserProduct(usersProducts)
+
+    if (usersCount < productsCount) {
+      val users = userFeatures.join(usersProducts).map {
+        case (user, (uFeatures, product)) => (product, (user, uFeatures))
+      }
+      users.join(productFeatures).map {
+        case (product, ((user, uFeatures), pFeatures)) =>
+          Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+      }
+    } else {
+      val products = productFeatures.join(usersProducts.map(_.swap)).map {
+        case (product, (pFeatures, user)) => (user, (product, pFeatures))
+      }
+      products.join(userFeatures).map {
+        case (user, ((product, pFeatures), uFeatures)) =>
+          Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org