You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/05/09 02:24:35 UTC

spark git commit: [SPARK-7488] [ML] Feature Parity in PySpark for ml.recommendation

Repository: spark
Updated Branches:
  refs/heads/master 54e6fa056 -> 84bf931f3


[SPARK-7488] [ML] Feature Parity in PySpark for ml.recommendation

Adds Python Api for `ALS` under `ml.recommendation` in PySpark. Also adds seed as a settable parameter in the Scala Implementation of ALS.

Author: Burak Yavuz <br...@gmail.com>

Closes #6015 from brkyvz/ml-rec and squashes the following commits:

be6e931 [Burak Yavuz] addressed comments
eaed879 [Burak Yavuz] readd numFeatures
0bd66b1 [Burak Yavuz] fixed seed
7f6d964 [Burak Yavuz] merged master
52e2bda [Burak Yavuz] added ALS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84bf931f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84bf931f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84bf931f

Branch: refs/heads/master
Commit: 84bf931f36edf1f319c9116f7f326959a6118991
Parents: 54e6fa0
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri May 8 17:24:32 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Fri May 8 17:24:32 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/ml/recommendation/ALS.scala    |  12 +-
 .../pyspark/ml/param/_shared_params_code_gen.py |   2 +
 python/pyspark/ml/param/shared.py               |  29 ++
 python/pyspark/ml/recommendation.py             | 279 +++++++++++++++++++
 4 files changed, 318 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84bf931f/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 6cf4b40..d7cbffc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util.random.XORShiftRandom
  * Common params for ALS.
  */
 private[recommendation] trait ALSParams extends Params with HasMaxIter with HasRegParam
-  with HasPredictionCol with HasCheckpointInterval {
+  with HasPredictionCol with HasCheckpointInterval with HasSeed {
 
   /**
    * Param for rank of the matrix factorization (>= 1).
@@ -147,7 +147,7 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR
 
   setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10,
     implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item",
-    ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10)
+    ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, seed -> 0L)
 
   /**
    * Validates and transforms the input schema.
@@ -278,6 +278,9 @@ class ALS extends Estimator[ALSModel] with ALSParams {
   /** @group setParam */
   def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
 
+  /** @group setParam */
+  def setSeed(value: Long): this.type = set(seed, value)
+
   /**
    * Sets both numUserBlocks and numItemBlocks to the specific value.
    * @group setParam
@@ -290,7 +293,8 @@ class ALS extends Estimator[ALSModel] with ALSParams {
 
   override def fit(dataset: DataFrame): ALSModel = {
     val ratings = dataset
-      .select(col($(userCol)), col($(itemCol)), col($(ratingCol)).cast(FloatType))
+      .select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType),
+        col($(ratingCol)).cast(FloatType))
       .map { row =>
         Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
       }
@@ -298,7 +302,7 @@ class ALS extends Estimator[ALSModel] with ALSParams {
       numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
       maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
       alpha = $(alpha), nonnegative = $(nonnegative),
-      checkpointInterval = $(checkpointInterval))
+      checkpointInterval = $(checkpointInterval), seed = $(seed))
     copyValues(new ALSModel(this, $(rank), userFactors, itemFactors))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/84bf931f/python/pyspark/ml/param/_shared_params_code_gen.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index ee901f2..ed3171b 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -97,6 +97,8 @@ if __name__ == "__main__":
         ("inputCol", "input column name", None),
         ("inputCols", "input column names", None),
         ("outputCol", "output column name", None),
+        ("numFeatures", "number of features", None),
+        ("checkpointInterval", "checkpoint interval (>= 1)", None),
         ("seed", "random seed", None),
         ("tol", "the convergence tolerance for iterative algorithms", None),
         ("stepSize", "Step size to be used for each iteration of optimization.", None)]

http://git-wip-us.apache.org/repos/asf/spark/blob/84bf931f/python/pyspark/ml/param/shared.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py
index 5e7529c..d0bcade 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -310,6 +310,35 @@ class HasNumFeatures(Params):
         return self.getOrDefault(self.numFeatures)
 
 
+class HasCheckpointInterval(Params):
+    """
+    Mixin for param checkpointInterval: checkpoint interval (>= 1).
+    """
+
+    # a placeholder to make it appear in the generated doc
+    checkpointInterval = Param(Params._dummy(), "checkpointInterval", "checkpoint interval (>= 1)")
+
+    def __init__(self):
+        super(HasCheckpointInterval, self).__init__()
+        #: param for checkpoint interval (>= 1)
+        self.checkpointInterval = Param(self, "checkpointInterval", "checkpoint interval (>= 1)")
+        if None is not None:
+            self._setDefault(checkpointInterval=None)
+
+    def setCheckpointInterval(self, value):
+        """
+        Sets the value of :py:attr:`checkpointInterval`.
+        """
+        self.paramMap[self.checkpointInterval] = value
+        return self
+
+    def getCheckpointInterval(self):
+        """
+        Gets the value of checkpointInterval or its default value.
+        """
+        return self.getOrDefault(self.checkpointInterval)
+
+
 class HasSeed(Params):
     """
     Mixin for param seed: random seed.

http://git-wip-us.apache.org/repos/asf/spark/blob/84bf931f/python/pyspark/ml/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
new file mode 100644
index 0000000..4846b90
--- /dev/null
+++ b/python/pyspark/ml/recommendation.py
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.ml.util import keyword_only
+from pyspark.ml.wrapper import JavaEstimator, JavaModel
+from pyspark.ml.param.shared import *
+from pyspark.mllib.common import inherit_doc
+
+
+__all__ = ['ALS', 'ALSModel']
+
+
+@inherit_doc
+class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed):
+    """
+    Alternating Least Squares (ALS) matrix factorization.
+
+    ALS attempts to estimate the ratings matrix `R` as the product of
+    two lower-rank matrices, `X` and `Y`, i.e. `X * Yt = R`. Typically
+    these approximations are called 'factor' matrices. The general
+    approach is iterative. During each iteration, one of the factor
+    matrices is held constant, while the other is solved for using least
+    squares. The newly-solved factor matrix is then held constant while
+    solving for the other factor matrix.
+
+    This is a blocked implementation of the ALS factorization algorithm
+    that groups the two sets of factors (referred to as "users" and
+    "products") into blocks and reduces communication by only sending
+    one copy of each user vector to each product block on each
+    iteration, and only for the product blocks that need that user's
+    feature vector. This is achieved by pre-computing some information
+    about the ratings matrix to determine the "out-links" of each user
+    (which blocks of products it will contribute to) and "in-link"
+    information for each product (which of the feature vectors it
+    receives from each user block it will depend on). This allows us to
+    send only an array of feature vectors between each user block and
+    product block, and have the product block find the users' ratings
+    and update the products based on these messages.
+
+    For implicit preference data, the algorithm used is based on
+    "Collaborative Filtering for Implicit Feedback Datasets", available
+    at `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked
+    approach used here.
+
+    Essentially instead of finding the low-rank approximations to the
+    rating matrix `R`, this finds the approximations for a preference
+    matrix `P` where the elements of `P` are 1 if r > 0 and 0 if r <= 0.
+    The ratings then act as 'confidence' values related to strength of
+    indicated user preferences rather than explicit ratings given to
+    items.
+
+    >>> als = ALS(rank=10, maxIter=5)
+    >>> model = als.fit(df)
+    >>> test = sqlContext.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"])
+    >>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
+    >>> predictions[0]
+    Row(user=0, item=2, prediction=0.39...)
+    >>> predictions[1]
+    Row(user=1, item=0, prediction=3.19...)
+    >>> predictions[2]
+    Row(user=2, item=0, prediction=-1.15...)
+    """
+    _java_class = "org.apache.spark.ml.recommendation.ALS"
+    # a placeholder to make it appear in the generated doc
+    rank = Param(Params._dummy(), "rank", "rank of the factorization")
+    numUserBlocks = Param(Params._dummy(), "numUserBlocks", "number of user blocks")
+    numItemBlocks = Param(Params._dummy(), "numItemBlocks", "number of item blocks")
+    implicitPrefs = Param(Params._dummy(), "implicitPrefs", "whether to use implicit preference")
+    alpha = Param(Params._dummy(), "alpha", "alpha for implicit preference")
+    userCol = Param(Params._dummy(), "userCol", "column name for user ids")
+    itemCol = Param(Params._dummy(), "itemCol", "column name for item ids")
+    ratingCol = Param(Params._dummy(), "ratingCol", "column name for ratings")
+    nonnegative = Param(Params._dummy(), "nonnegative",
+                        "whether to use nonnegative constraint for least squares")
+
+    @keyword_only
+    def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+                 implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+                 ratingCol="rating", nonnegative=False, checkpointInterval=10):
+        """
+        __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+                 implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0,
+                 ratingCol="rating", nonnegative=false, checkpointInterval=10)
+        """
+        super(ALS, self).__init__()
+        self.rank = Param(self, "rank", "rank of the factorization")
+        self.numUserBlocks = Param(self, "numUserBlocks", "number of user blocks")
+        self.numItemBlocks = Param(self, "numItemBlocks", "number of item blocks")
+        self.implicitPrefs = Param(self, "implicitPrefs", "whether to use implicit preference")
+        self.alpha = Param(self, "alpha", "alpha for implicit preference")
+        self.userCol = Param(self, "userCol", "column name for user ids")
+        self.itemCol = Param(self, "itemCol", "column name for item ids")
+        self.ratingCol = Param(self, "ratingCol", "column name for ratings")
+        self.nonnegative = Param(self, "nonnegative",
+                                 "whether to use nonnegative constraint for least squares")
+        self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+                         implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+                         ratingCol="rating", nonnegative=False, checkpointInterval=10)
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+                  implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+                  ratingCol="rating", nonnegative=False, checkpointInterval=10):
+        """
+        setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+                 implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+                 ratingCol="rating", nonnegative=False, checkpointInterval=10)
+        Sets params for ALS.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set(**kwargs)
+
+    def _create_model(self, java_model):
+        return ALSModel(java_model)
+
+    def setRank(self, value):
+        """
+        Sets the value of :py:attr:`rank`.
+        """
+        self.paramMap[self.rank] = value
+        return self
+
+    def getRank(self):
+        """
+        Gets the value of rank or its default value.
+        """
+        return self.getOrDefault(self.rank)
+
+    def setNumUserBlocks(self, value):
+        """
+        Sets the value of :py:attr:`numUserBlocks`.
+        """
+        self.paramMap[self.numUserBlocks] = value
+        return self
+
+    def getNumUserBlocks(self):
+        """
+        Gets the value of numUserBlocks or its default value.
+        """
+        return self.getOrDefault(self.numUserBlocks)
+
+    def setNumItemBlocks(self, value):
+        """
+        Sets the value of :py:attr:`numItemBlocks`.
+        """
+        self.paramMap[self.numItemBlocks] = value
+        return self
+
+    def getNumItemBlocks(self):
+        """
+        Gets the value of numItemBlocks or its default value.
+        """
+        return self.getOrDefault(self.numItemBlocks)
+
+    def setNumBlocks(self, value):
+        """
+        Sets both :py:attr:`numUserBlocks` and :py:attr:`numItemBlocks` to the specific value.
+        """
+        self.paramMap[self.numUserBlocks] = value
+        self.paramMap[self.numItemBlocks] = value
+
+    def setImplicitPrefs(self, value):
+        """
+        Sets the value of :py:attr:`implicitPrefs`.
+        """
+        self.paramMap[self.implicitPrefs] = value
+        return self
+
+    def getImplicitPrefs(self):
+        """
+        Gets the value of implicitPrefs or its default value.
+        """
+        return self.getOrDefault(self.implicitPrefs)
+
+    def setAlpha(self, value):
+        """
+        Sets the value of :py:attr:`alpha`.
+        """
+        self.paramMap[self.alpha] = value
+        return self
+
+    def getAlpha(self):
+        """
+        Gets the value of alpha or its default value.
+        """
+        return self.getOrDefault(self.alpha)
+
+    def setUserCol(self, value):
+        """
+        Sets the value of :py:attr:`userCol`.
+        """
+        self.paramMap[self.userCol] = value
+        return self
+
+    def getUserCol(self):
+        """
+        Gets the value of userCol or its default value.
+        """
+        return self.getOrDefault(self.userCol)
+
+    def setItemCol(self, value):
+        """
+        Sets the value of :py:attr:`itemCol`.
+        """
+        self.paramMap[self.itemCol] = value
+        return self
+
+    def getItemCol(self):
+        """
+        Gets the value of itemCol or its default value.
+        """
+        return self.getOrDefault(self.itemCol)
+
+    def setRatingCol(self, value):
+        """
+        Sets the value of :py:attr:`ratingCol`.
+        """
+        self.paramMap[self.ratingCol] = value
+        return self
+
+    def getRatingCol(self):
+        """
+        Gets the value of ratingCol or its default value.
+        """
+        return self.getOrDefault(self.ratingCol)
+
+    def setNonnegative(self, value):
+        """
+        Sets the value of :py:attr:`nonnegative`.
+        """
+        self.paramMap[self.nonnegative] = value
+        return self
+
+    def getNonnegative(self):
+        """
+        Gets the value of nonnegative or its default value.
+        """
+        return self.getOrDefault(self.nonnegative)
+
+
+class ALSModel(JavaModel):
+    """
+    Model fitted by ALS.
+    """
+
+
+if __name__ == "__main__":
+    import doctest
+    from pyspark.context import SparkContext
+    from pyspark.sql import SQLContext
+    globs = globals().copy()
+    # The small batch size here ensures that we see multiple batches,
+    # even in these small test examples:
+    sc = SparkContext("local[2]", "ml.recommendation tests")
+    sqlContext = SQLContext(sc)
+    globs['sc'] = sc
+    globs['sqlContext'] = sqlContext
+    globs['df'] = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0),
+                                              (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"])
+    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+    sc.stop()
+    if failure_count:
+        exit(-1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org