You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ml...@apache.org on 2017/10/09 08:42:36 UTC

spark git commit: [SPARK-20679][ML] Support recommending for a subset of users/items in ALSModel

Repository: spark
Updated Branches:
  refs/heads/master fe7b219ae -> 98057583d


[SPARK-20679][ML] Support recommending for a subset of users/items in ALSModel

This PR adds methods `recommendForUserSubset` and `recommendForItemSubset` to `ALSModel`. These allow recommending for a specified set of user / item ids rather than for every user / item (as in the `recommendForAllX` methods).

The subset methods take a `DataFrame` as input, containing ids in the column specified by the param `userCol` or `itemCol`. The model will generate recommendations for each _unique_ id in this input dataframe.

## How was this patch tested?
New unit tests in `ALSSuite` and Python doctests in `ALS`. Ran updated examples locally.

Author: Nick Pentreath <ni...@za.ibm.com>

Closes #18748 from MLnick/als-recommend-df.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98057583
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98057583
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98057583

Branch: refs/heads/master
Commit: 98057583dd2787c0e396c2658c7dd76412f86936
Parents: fe7b219
Author: Nick Pentreath <ni...@za.ibm.com>
Authored: Mon Oct 9 10:42:33 2017 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Mon Oct 9 10:42:33 2017 +0200

----------------------------------------------------------------------
 .../spark/examples/ml/JavaALSExample.java       |   9 ++
 examples/src/main/python/ml/als_example.py      |   9 ++
 .../apache/spark/examples/ml/ALSExample.scala   |   9 ++
 .../apache/spark/ml/recommendation/ALS.scala    |  48 +++++++++
 .../spark/ml/recommendation/ALSSuite.scala      | 100 +++++++++++++++++--
 python/pyspark/ml/recommendation.py             |  38 +++++++
 6 files changed, 205 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index fe4d6bc..27052be 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -118,9 +118,18 @@ public class JavaALSExample {
     Dataset<Row> userRecs = model.recommendForAllUsers(10);
     // Generate top 10 user recommendations for each movie
     Dataset<Row> movieRecs = model.recommendForAllItems(10);
+
+    // Generate top 10 movie recommendations for a specified set of users
+    Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
+    Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
+    // Generate top 10 user recommendations for a specified set of movies
+    Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
+    Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
     // $example off$
     userRecs.show();
     movieRecs.show();
+    userSubsetRecs.show();
+    movieSubSetRecs.show();
 
     spark.stop();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/python/ml/als_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py
index 1672d55..8b7ec9c 100644
--- a/examples/src/main/python/ml/als_example.py
+++ b/examples/src/main/python/ml/als_example.py
@@ -60,8 +60,17 @@ if __name__ == "__main__":
     userRecs = model.recommendForAllUsers(10)
     # Generate top 10 user recommendations for each movie
     movieRecs = model.recommendForAllItems(10)
+
+    # Generate top 10 movie recommendations for a specified set of users
+    users = ratings.select(als.getUserCol()).distinct().limit(3)
+    userSubsetRecs = model.recommendForUserSubset(users, 10)
+    # Generate top 10 user recommendations for a specified set of movies
+    movies = ratings.select(als.getItemCol()).distinct().limit(3)
+    movieSubSetRecs = model.recommendForItemSubset(movies, 10)
     # $example off$
     userRecs.show()
     movieRecs.show()
+    userSubsetRecs.show()
+    movieSubSetRecs.show()
 
     spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
index 07b15df..8091838 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
@@ -80,9 +80,18 @@ object ALSExample {
     val userRecs = model.recommendForAllUsers(10)
     // Generate top 10 user recommendations for each movie
     val movieRecs = model.recommendForAllItems(10)
+
+    // Generate top 10 movie recommendations for a specified set of users
+    val users = ratings.select(als.getUserCol).distinct().limit(3)
+    val userSubsetRecs = model.recommendForUserSubset(users, 10)
+    // Generate top 10 user recommendations for a specified set of movies
+    val movies = ratings.select(als.getItemCol).distinct().limit(3)
+    val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
     // $example off$
     userRecs.show()
     movieRecs.show()
+    userSubsetRecs.show()
+    movieSubSetRecs.show()
 
     spark.stop()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 3d5fd17..a884366 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -345,6 +345,21 @@ class ALSModel private[ml] (
   }
 
   /**
+   * Returns top `numItems` items recommended for each user id in the input data set. Note that if
+   * there are duplicate ids in the input dataset, only one set of recommendations per unique id
+   * will be returned.
+   * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`.
+   * @param numItems max number of recommendations for each user.
+   * @return a DataFrame of (userCol: Int, recommendations), where recommendations are
+   *         stored as an array of (itemCol: Int, rating: Float) Rows.
+   */
+  @Since("2.3.0")
+  def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
+    val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
+    recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
+  }
+
+  /**
    * Returns top `numUsers` users recommended for each item, for all items.
    * @param numUsers max number of recommendations for each item
    * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are
@@ -356,6 +371,39 @@ class ALSModel private[ml] (
   }
 
   /**
+   * Returns top `numUsers` users recommended for each item id in the input data set. Note that if
+   * there are duplicate ids in the input dataset, only one set of recommendations per unique id
+   * will be returned.
+   * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`.
+   * @param numUsers max number of recommendations for each item.
+   * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are
+   *         stored as an array of (userCol: Int, rating: Float) Rows.
+   */
+  @Since("2.3.0")
+  def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
+    val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
+    recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
+  }
+
+  /**
+   * Returns a subset of a factor DataFrame limited to only those unique ids contained
+   * in the input dataset.
+   * @param dataset input Dataset containing id column to user to filter factors.
+   * @param factors factor DataFrame to filter.
+   * @param column column name containing the ids in the input dataset.
+   * @return DataFrame containing factors only for those ids present in both the input dataset and
+   *         the factor DataFrame.
+   */
+  private def getSourceFactorSubset(
+      dataset: Dataset[_],
+      factors: DataFrame,
+      column: String): DataFrame = {
+    factors
+      .join(dataset.select(column), factors("id") === dataset(column), joinType = "left_semi")
+      .select(factors("id"), factors("features"))
+  }
+
+  /**
    * Makes recommendations for all users (or items).
    *
    * Note: the previous approach used for computing top-k recommendations

http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index ac73191..addcd21 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -723,9 +723,9 @@ class ALSSuite
     val numUsers = model.userFactors.count
     val numItems = model.itemFactors.count
     val expected = Map(
-      0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
-      1 -> Array((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
-      2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f))
+      0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
+      1 -> Seq((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
+      2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
     )
 
     Seq(2, 4, 6).foreach { k =>
@@ -743,10 +743,10 @@ class ALSSuite
     val numUsers = model.userFactors.count
     val numItems = model.itemFactors.count
     val expected = Map(
-      3 -> Array((0, 54f), (2, 51f), (1, 39f)),
-      4 -> Array((0, 44f), (2, 30f), (1, 26f)),
-      5 -> Array((2, 45f), (0, 42f), (1, 33f)),
-      6 -> Array((0, 28f), (2, 18f), (1, 16f))
+      3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
+      4 -> Seq((0, 44f), (2, 30f), (1, 26f)),
+      5 -> Seq((2, 45f), (0, 42f), (1, 33f)),
+      6 -> Seq((0, 28f), (2, 18f), (1, 16f))
     )
 
     Seq(2, 3, 4).foreach { k =>
@@ -759,9 +759,93 @@ class ALSSuite
     }
   }
 
+  test("recommendForUserSubset with k <, = and > num_items") {
+    val spark = this.spark
+    import spark.implicits._
+    val model = getALSModel
+    val numItems = model.itemFactors.count
+    val expected = Map(
+      0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
+      2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
+    )
+    val userSubset = expected.keys.toSeq.toDF("user")
+    val numUsersSubset = userSubset.count
+
+    Seq(2, 4, 6).foreach { k =>
+      val n = math.min(k, numItems).toInt
+      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val topItems = model.recommendForUserSubset(userSubset, k)
+      assert(topItems.count() == numUsersSubset)
+      assert(topItems.columns.contains("user"))
+      checkRecommendations(topItems, expectedUpToN, "item")
+    }
+  }
+
+  test("recommendForItemSubset with k <, = and > num_users") {
+    val spark = this.spark
+    import spark.implicits._
+    val model = getALSModel
+    val numUsers = model.userFactors.count
+    val expected = Map(
+      3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
+      6 -> Seq((0, 28f), (2, 18f), (1, 16f))
+    )
+    val itemSubset = expected.keys.toSeq.toDF("item")
+    val numItemsSubset = itemSubset.count
+
+    Seq(2, 3, 4).foreach { k =>
+      val n = math.min(k, numUsers).toInt
+      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val topUsers = model.recommendForItemSubset(itemSubset, k)
+      assert(topUsers.count() == numItemsSubset)
+      assert(topUsers.columns.contains("item"))
+      checkRecommendations(topUsers, expectedUpToN, "user")
+    }
+  }
+
+  test("subset recommendations eliminate duplicate ids, returns same results as unique ids") {
+    val spark = this.spark
+    import spark.implicits._
+    val model = getALSModel
+    val k = 2
+
+    val users = Seq(0, 1).toDF("user")
+    val dupUsers = Seq(0, 1, 0, 1).toDF("user")
+    val singleUserRecs = model.recommendForUserSubset(users, k)
+    val dupUserRecs = model.recommendForUserSubset(dupUsers, k)
+      .as[(Int, Seq[(Int, Float)])].collect().toMap
+    assert(singleUserRecs.count == dupUserRecs.size)
+    checkRecommendations(singleUserRecs, dupUserRecs, "item")
+
+    val items = Seq(3, 4, 5).toDF("item")
+    val dupItems = Seq(3, 4, 5, 4, 5).toDF("item")
+    val singleItemRecs = model.recommendForItemSubset(items, k)
+    val dupItemRecs = model.recommendForItemSubset(dupItems, k)
+      .as[(Int, Seq[(Int, Float)])].collect().toMap
+    assert(singleItemRecs.count == dupItemRecs.size)
+    checkRecommendations(singleItemRecs, dupItemRecs, "user")
+  }
+
+  test("subset recommendations on full input dataset equivalent to recommendForAll") {
+    val spark = this.spark
+    import spark.implicits._
+    val model = getALSModel
+    val k = 2
+
+    val userSubset = model.userFactors.withColumnRenamed("id", "user").drop("features")
+    val userSubsetRecs = model.recommendForUserSubset(userSubset, k)
+    val allUserRecs = model.recommendForAllUsers(k).as[(Int, Seq[(Int, Float)])].collect().toMap
+    checkRecommendations(userSubsetRecs, allUserRecs, "item")
+
+    val itemSubset = model.itemFactors.withColumnRenamed("id", "item").drop("features")
+    val itemSubsetRecs = model.recommendForItemSubset(itemSubset, k)
+    val allItemRecs = model.recommendForAllItems(k).as[(Int, Seq[(Int, Float)])].collect().toMap
+    checkRecommendations(itemSubsetRecs, allItemRecs, "user")
+  }
+
   private def checkRecommendations(
       topK: DataFrame,
-      expected: Map[Int, Array[(Int, Float)]],
+      expected: Map[Int, Seq[(Int, Float)]],
       dstColName: String): Unit = {
     val spark = this.spark
     import spark.implicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/python/pyspark/ml/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index bcfb368..e8bcbe4 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -90,6 +90,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
     >>> item_recs.where(item_recs.item == 2)\
         .select("recommendations.user", "recommendations.rating").collect()
     [Row(user=[2, 1, 0], rating=[4.901..., 3.981..., -0.138...])]
+    >>> user_subset = df.where(df.user == 2)
+    >>> user_subset_recs = model.recommendForUserSubset(user_subset, 3)
+    >>> user_subset_recs.select("recommendations.item", "recommendations.rating").first()
+    Row(item=[2, 1, 0], rating=[4.901..., 1.056..., -1.501...])
+    >>> item_subset = df.where(df.item == 0)
+    >>> item_subset_recs = model.recommendForItemSubset(item_subset, 3)
+    >>> item_subset_recs.select("recommendations.user", "recommendations.rating").first()
+    Row(user=[0, 1, 2], rating=[3.910..., 2.625..., -1.501...])
     >>> als_path = temp_path + "/als"
     >>> als.save(als_path)
     >>> als2 = ALS.load(als_path)
@@ -414,6 +422,36 @@ class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):
         """
         return self._call_java("recommendForAllItems", numUsers)
 
+    @since("2.3.0")
+    def recommendForUserSubset(self, dataset, numItems):
+        """
+        Returns top `numItems` items recommended for each user id in the input data set. Note that
+        if there are duplicate ids in the input dataset, only one set of recommendations per unique
+        id will be returned.
+
+        :param dataset: a Dataset containing a column of user ids. The column name must match
+                        `userCol`.
+        :param numItems: max number of recommendations for each user
+        :return: a DataFrame of (userCol, recommendations), where recommendations are
+                 stored as an array of (itemCol, rating) Rows.
+        """
+        return self._call_java("recommendForUserSubset", dataset, numItems)
+
+    @since("2.3.0")
+    def recommendForItemSubset(self, dataset, numUsers):
+        """
+        Returns top `numUsers` users recommended for each item id in the input data set. Note that
+        if there are duplicate ids in the input dataset, only one set of recommendations per unique
+        id will be returned.
+
+        :param dataset: a Dataset containing a column of item ids. The column name must match
+                        `itemCol`.
+        :param numUsers: max number of recommendations for each item
+        :return: a DataFrame of (itemCol, recommendations), where recommendations are
+                 stored as an array of (userCol, rating) Rows.
+        """
+        return self._call_java("recommendForItemSubset", dataset, numUsers)
+
 
 if __name__ == "__main__":
     import doctest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org