You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ml...@apache.org on 2017/10/09 08:42:36 UTC
spark git commit: [SPARK-20679][ML] Support recommending for a subset
of users/items in ALSModel
Repository: spark
Updated Branches:
refs/heads/master fe7b219ae -> 98057583d
[SPARK-20679][ML] Support recommending for a subset of users/items in ALSModel
This PR adds methods `recommendForUserSubset` and `recommendForItemSubset` to `ALSModel`. These allow recommending for a specified set of user / item ids rather than for every user / item (as in the `recommendForAllX` methods).
The subset methods take a `DataFrame` as input, containing ids in the column specified by the param `userCol` or `itemCol`. The model will generate recommendations for each _unique_ id in this input dataframe.
## How was this patch tested?
New unit tests in `ALSSuite` and Python doctests in `ALS`. Ran updated examples locally.
Author: Nick Pentreath <ni...@za.ibm.com>
Closes #18748 from MLnick/als-recommend-df.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98057583
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98057583
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98057583
Branch: refs/heads/master
Commit: 98057583dd2787c0e396c2658c7dd76412f86936
Parents: fe7b219
Author: Nick Pentreath <ni...@za.ibm.com>
Authored: Mon Oct 9 10:42:33 2017 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Mon Oct 9 10:42:33 2017 +0200
----------------------------------------------------------------------
.../spark/examples/ml/JavaALSExample.java | 9 ++
examples/src/main/python/ml/als_example.py | 9 ++
.../apache/spark/examples/ml/ALSExample.scala | 9 ++
.../apache/spark/ml/recommendation/ALS.scala | 48 +++++++++
.../spark/ml/recommendation/ALSSuite.scala | 100 +++++++++++++++++--
python/pyspark/ml/recommendation.py | 38 +++++++
6 files changed, 205 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index fe4d6bc..27052be 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -118,9 +118,18 @@ public class JavaALSExample {
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
+
+ // Generate top 10 movie recommendations for a specified set of users
+ Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
+ Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
+ // Generate top 10 user recommendations for a specified set of movies
+ Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
+ Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
// $example off$
userRecs.show();
movieRecs.show();
+ userSubsetRecs.show();
+ movieSubSetRecs.show();
spark.stop();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/python/ml/als_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py
index 1672d55..8b7ec9c 100644
--- a/examples/src/main/python/ml/als_example.py
+++ b/examples/src/main/python/ml/als_example.py
@@ -60,8 +60,17 @@ if __name__ == "__main__":
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
+
+ # Generate top 10 movie recommendations for a specified set of users
+ users = ratings.select(als.getUserCol()).distinct().limit(3)
+ userSubsetRecs = model.recommendForUserSubset(users, 10)
+ # Generate top 10 user recommendations for a specified set of movies
+ movies = ratings.select(als.getItemCol()).distinct().limit(3)
+ movieSubSetRecs = model.recommendForItemSubset(movies, 10)
# $example off$
userRecs.show()
movieRecs.show()
+ userSubsetRecs.show()
+ movieSubSetRecs.show()
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
index 07b15df..8091838 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
@@ -80,9 +80,18 @@ object ALSExample {
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
+
+ // Generate top 10 movie recommendations for a specified set of users
+ val users = ratings.select(als.getUserCol).distinct().limit(3)
+ val userSubsetRecs = model.recommendForUserSubset(users, 10)
+ // Generate top 10 user recommendations for a specified set of movies
+ val movies = ratings.select(als.getItemCol).distinct().limit(3)
+ val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
// $example off$
userRecs.show()
movieRecs.show()
+ userSubsetRecs.show()
+ movieSubSetRecs.show()
spark.stop()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 3d5fd17..a884366 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -345,6 +345,21 @@ class ALSModel private[ml] (
}
/**
+ * Returns top `numItems` items recommended for each user id in the input data set. Note that if
+ * there are duplicate ids in the input dataset, only one set of recommendations per unique id
+ * will be returned.
+ * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`.
+ * @param numItems max number of recommendations for each user.
+ * @return a DataFrame of (userCol: Int, recommendations), where recommendations are
+ * stored as an array of (itemCol: Int, rating: Float) Rows.
+ */
+ @Since("2.3.0")
+ def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
+ val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
+ recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
+ }
+
+ /**
* Returns top `numUsers` users recommended for each item, for all items.
* @param numUsers max number of recommendations for each item
* @return a DataFrame of (itemCol: Int, recommendations), where recommendations are
@@ -356,6 +371,39 @@ class ALSModel private[ml] (
}
/**
+ * Returns top `numUsers` users recommended for each item id in the input data set. Note that if
+ * there are duplicate ids in the input dataset, only one set of recommendations per unique id
+ * will be returned.
+ * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`.
+ * @param numUsers max number of recommendations for each item.
+ * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are
+ * stored as an array of (userCol: Int, rating: Float) Rows.
+ */
+ @Since("2.3.0")
+ def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
+ val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
+ recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
+ }
+
+ /**
+ * Returns a subset of a factor DataFrame limited to only those unique ids contained
+ * in the input dataset.
+ * @param dataset input Dataset containing id column to user to filter factors.
+ * @param factors factor DataFrame to filter.
+ * @param column column name containing the ids in the input dataset.
+ * @return DataFrame containing factors only for those ids present in both the input dataset and
+ * the factor DataFrame.
+ */
+ private def getSourceFactorSubset(
+ dataset: Dataset[_],
+ factors: DataFrame,
+ column: String): DataFrame = {
+ factors
+ .join(dataset.select(column), factors("id") === dataset(column), joinType = "left_semi")
+ .select(factors("id"), factors("features"))
+ }
+
+ /**
* Makes recommendations for all users (or items).
*
* Note: the previous approach used for computing top-k recommendations
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index ac73191..addcd21 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -723,9 +723,9 @@ class ALSSuite
val numUsers = model.userFactors.count
val numItems = model.itemFactors.count
val expected = Map(
- 0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
- 1 -> Array((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
- 2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f))
+ 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
+ 1 -> Seq((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
+ 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
)
Seq(2, 4, 6).foreach { k =>
@@ -743,10 +743,10 @@ class ALSSuite
val numUsers = model.userFactors.count
val numItems = model.itemFactors.count
val expected = Map(
- 3 -> Array((0, 54f), (2, 51f), (1, 39f)),
- 4 -> Array((0, 44f), (2, 30f), (1, 26f)),
- 5 -> Array((2, 45f), (0, 42f), (1, 33f)),
- 6 -> Array((0, 28f), (2, 18f), (1, 16f))
+ 3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
+ 4 -> Seq((0, 44f), (2, 30f), (1, 26f)),
+ 5 -> Seq((2, 45f), (0, 42f), (1, 33f)),
+ 6 -> Seq((0, 28f), (2, 18f), (1, 16f))
)
Seq(2, 3, 4).foreach { k =>
@@ -759,9 +759,93 @@ class ALSSuite
}
}
+ test("recommendForUserSubset with k <, = and > num_items") {
+ val spark = this.spark
+ import spark.implicits._
+ val model = getALSModel
+ val numItems = model.itemFactors.count
+ val expected = Map(
+ 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
+ 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
+ )
+ val userSubset = expected.keys.toSeq.toDF("user")
+ val numUsersSubset = userSubset.count
+
+ Seq(2, 4, 6).foreach { k =>
+ val n = math.min(k, numItems).toInt
+ val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val topItems = model.recommendForUserSubset(userSubset, k)
+ assert(topItems.count() == numUsersSubset)
+ assert(topItems.columns.contains("user"))
+ checkRecommendations(topItems, expectedUpToN, "item")
+ }
+ }
+
+ test("recommendForItemSubset with k <, = and > num_users") {
+ val spark = this.spark
+ import spark.implicits._
+ val model = getALSModel
+ val numUsers = model.userFactors.count
+ val expected = Map(
+ 3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
+ 6 -> Seq((0, 28f), (2, 18f), (1, 16f))
+ )
+ val itemSubset = expected.keys.toSeq.toDF("item")
+ val numItemsSubset = itemSubset.count
+
+ Seq(2, 3, 4).foreach { k =>
+ val n = math.min(k, numUsers).toInt
+ val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val topUsers = model.recommendForItemSubset(itemSubset, k)
+ assert(topUsers.count() == numItemsSubset)
+ assert(topUsers.columns.contains("item"))
+ checkRecommendations(topUsers, expectedUpToN, "user")
+ }
+ }
+
+ test("subset recommendations eliminate duplicate ids, returns same results as unique ids") {
+ val spark = this.spark
+ import spark.implicits._
+ val model = getALSModel
+ val k = 2
+
+ val users = Seq(0, 1).toDF("user")
+ val dupUsers = Seq(0, 1, 0, 1).toDF("user")
+ val singleUserRecs = model.recommendForUserSubset(users, k)
+ val dupUserRecs = model.recommendForUserSubset(dupUsers, k)
+ .as[(Int, Seq[(Int, Float)])].collect().toMap
+ assert(singleUserRecs.count == dupUserRecs.size)
+ checkRecommendations(singleUserRecs, dupUserRecs, "item")
+
+ val items = Seq(3, 4, 5).toDF("item")
+ val dupItems = Seq(3, 4, 5, 4, 5).toDF("item")
+ val singleItemRecs = model.recommendForItemSubset(items, k)
+ val dupItemRecs = model.recommendForItemSubset(dupItems, k)
+ .as[(Int, Seq[(Int, Float)])].collect().toMap
+ assert(singleItemRecs.count == dupItemRecs.size)
+ checkRecommendations(singleItemRecs, dupItemRecs, "user")
+ }
+
+ test("subset recommendations on full input dataset equivalent to recommendForAll") {
+ val spark = this.spark
+ import spark.implicits._
+ val model = getALSModel
+ val k = 2
+
+ val userSubset = model.userFactors.withColumnRenamed("id", "user").drop("features")
+ val userSubsetRecs = model.recommendForUserSubset(userSubset, k)
+ val allUserRecs = model.recommendForAllUsers(k).as[(Int, Seq[(Int, Float)])].collect().toMap
+ checkRecommendations(userSubsetRecs, allUserRecs, "item")
+
+ val itemSubset = model.itemFactors.withColumnRenamed("id", "item").drop("features")
+ val itemSubsetRecs = model.recommendForItemSubset(itemSubset, k)
+ val allItemRecs = model.recommendForAllItems(k).as[(Int, Seq[(Int, Float)])].collect().toMap
+ checkRecommendations(itemSubsetRecs, allItemRecs, "user")
+ }
+
private def checkRecommendations(
topK: DataFrame,
- expected: Map[Int, Array[(Int, Float)]],
+ expected: Map[Int, Seq[(Int, Float)]],
dstColName: String): Unit = {
val spark = this.spark
import spark.implicits._
http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/python/pyspark/ml/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index bcfb368..e8bcbe4 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -90,6 +90,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
>>> item_recs.where(item_recs.item == 2)\
.select("recommendations.user", "recommendations.rating").collect()
[Row(user=[2, 1, 0], rating=[4.901..., 3.981..., -0.138...])]
+ >>> user_subset = df.where(df.user == 2)
+ >>> user_subset_recs = model.recommendForUserSubset(user_subset, 3)
+ >>> user_subset_recs.select("recommendations.item", "recommendations.rating").first()
+ Row(item=[2, 1, 0], rating=[4.901..., 1.056..., -1.501...])
+ >>> item_subset = df.where(df.item == 0)
+ >>> item_subset_recs = model.recommendForItemSubset(item_subset, 3)
+ >>> item_subset_recs.select("recommendations.user", "recommendations.rating").first()
+ Row(user=[0, 1, 2], rating=[3.910..., 2.625..., -1.501...])
>>> als_path = temp_path + "/als"
>>> als.save(als_path)
>>> als2 = ALS.load(als_path)
@@ -414,6 +422,36 @@ class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):
"""
return self._call_java("recommendForAllItems", numUsers)
+ @since("2.3.0")
+ def recommendForUserSubset(self, dataset, numItems):
+ """
+ Returns top `numItems` items recommended for each user id in the input data set. Note that
+ if there are duplicate ids in the input dataset, only one set of recommendations per unique
+ id will be returned.
+
+ :param dataset: a Dataset containing a column of user ids. The column name must match
+ `userCol`.
+ :param numItems: max number of recommendations for each user
+ :return: a DataFrame of (userCol, recommendations), where recommendations are
+ stored as an array of (itemCol, rating) Rows.
+ """
+ return self._call_java("recommendForUserSubset", dataset, numItems)
+
+ @since("2.3.0")
+ def recommendForItemSubset(self, dataset, numUsers):
+ """
+ Returns top `numUsers` users recommended for each item id in the input data set. Note that
+ if there are duplicate ids in the input dataset, only one set of recommendations per unique
+ id will be returned.
+
+ :param dataset: a Dataset containing a column of item ids. The column name must match
+ `itemCol`.
+ :param numUsers: max number of recommendations for each item
+ :return: a DataFrame of (itemCol, recommendations), where recommendations are
+ stored as an array of (userCol, rating) Rows.
+ """
+ return self._call_java("recommendForItemSubset", dataset, numUsers)
+
if __name__ == "__main__":
import doctest
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org