You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2020/02/26 02:58:35 UTC

[spark] branch branch-3.0 updated: [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 84345c7  [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP
84345c7 is described below

commit 84345c7e67c9dfd47ec76d5a3d2ad76b62f959b6
Author: Huaxin Gao <hu...@us.ibm.com>
AuthorDate: Sun Feb 9 13:14:30 2020 +0800

    [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP
    
    ### What changes were proposed in this pull request?
    Add ```HasBlockSize``` in shared Params in both Scala and Python.
    Make ALS/MLP extend ```HasBlockSize```
    
    ### Why are the changes needed?
    Add ```HasBlockSize ``` in ALS, so user can specify the blockSize.
    Make ```HasBlockSize``` a shared param so both ALS and MLP can use it.
    
    ### Does this PR introduce any user-facing change?
    Yes
    ```ALS.setBlockSize/getBlockSize```
    ```ALSModel.setBlockSize/getBlockSize```
    
    ### How was this patch tested?
    Manually tested. Also added doctest.
    
    Closes #27501 from huaxingao/spark_30662.
    
    Authored-by: Huaxin Gao <hu...@us.ibm.com>
    Signed-off-by: zhengruifeng <ru...@foxmail.com>
---
 .../MultilayerPerceptronClassifier.scala           | 22 +----------
 .../ml/param/shared/SharedParamsCodeGen.scala      |  6 ++-
 .../spark/ml/param/shared/sharedParams.scala       | 17 ++++++++
 .../org/apache/spark/ml/recommendation/ALS.scala   | 46 ++++++++++++++++------
 python/pyspark/ml/classification.py                | 22 ++++-------
 python/pyspark/ml/param/_shared_params_code_gen.py |  5 ++-
 python/pyspark/ml/param/shared.py                  | 17 ++++++++
 python/pyspark/ml/recommendation.py                | 29 +++++++++++---
 8 files changed, 109 insertions(+), 55 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index c7a8237..6e8f92b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
 
 /** Params for Multilayer Perceptron. */
 private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
-  with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
+  with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
 
   import MultilayerPerceptronClassifier._
 
@@ -55,26 +55,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
   final def getLayers: Array[Int] = $(layers)
 
   /**
-   * Block size for stacking input data in matrices to speed up the computation.
-   * Data is stacked within partitions. If block size is more than remaining data in
-   * a partition then it is adjusted to the size of this data.
-   * Recommended size is between 10 and 1000.
-   * Default: 128
-   *
-   * @group expertParam
-   */
-  @Since("1.5.0")
-  final val blockSize: IntParam = new IntParam(this, "blockSize",
-    "Block size for stacking input data in matrices. Data is stacked within partitions." +
-      " If block size is more than remaining data in a partition then " +
-      "it is adjusted to the size of this data. Recommended size is between 10 and 1000",
-    ParamValidators.gt(0))
-
-  /** @group expertGetParam */
-  @Since("1.5.0")
-  final def getBlockSize: Int = $(blockSize)
-
-  /**
    * The solver algorithm for optimization.
    * Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
    * Default: "l-bfgs"
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 7ac680e..6194dfa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen {
         isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"),
       ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " +
         "each row is for training or for validation. False indicates training; true indicates " +
-        "validation.")
+        "validation."),
+      ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
+        "stacked within partitions. If block size is more than remaining data in a partition " +
+        "then it is adjusted to the size of this data.",
+        isValid = "ParamValidators.gt(0)", isExpertParam = true)
     )
 
     val code = genSharedParams(params)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 44c993e..0c0d2b5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -578,4 +578,21 @@ trait HasValidationIndicatorCol extends Params {
   /** @group getParam */
   final def getValidationIndicatorCol: String = $(validationIndicatorCol)
 }
+
+/**
+ * Trait for shared param blockSize. This trait may be changed or
+ * removed between minor versions.
+ */
+@DeveloperApi
+trait HasBlockSize extends Params {
+
+  /**
+   * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..
+   * @group expertParam
+   */
+  final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0))
+
+  /** @group expertGetParam */
+  final def getBlockSize: Int = $(blockSize)
+}
 // scalastyle:on
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 2fb9a27..002146f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom
 /**
  * Common params for ALS and ALSModel.
  */
-private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
+private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
+  with HasBlockSize {
   /**
    * Param for the column name for user ids. Ids must be integers. Other
    * numeric types are supported for this column, but will be cast to integers as long as they
@@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
 
   /** @group expertGetParam */
   def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)
+
+  setDefault(blockSize -> 4096)
 }
 
 /**
@@ -288,6 +291,15 @@ class ALSModel private[ml] (
   @Since("2.2.0")
   def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
 
+  /**
+   * Set block size for stacking input data in matrices.
+   * Default is 4096.
+   *
+   * @group expertSetParam
+   */
+  @Since("3.0.0")
+  def setBlockSize(value: Int): this.type = set(blockSize, value)
+
   private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
     if (featuresA != null && featuresB != null) {
       var dotProduct = 0.0f
@@ -351,7 +363,7 @@ class ALSModel private[ml] (
    */
   @Since("2.2.0")
   def recommendForAllUsers(numItems: Int): DataFrame = {
-    recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
+    recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
   }
 
   /**
@@ -366,7 +378,7 @@ class ALSModel private[ml] (
   @Since("2.3.0")
   def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
     val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
-    recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
+    recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
   }
 
   /**
@@ -377,7 +389,7 @@ class ALSModel private[ml] (
    */
   @Since("2.2.0")
   def recommendForAllItems(numUsers: Int): DataFrame = {
-    recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
+    recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
   }
 
   /**
@@ -392,7 +404,7 @@ class ALSModel private[ml] (
   @Since("2.3.0")
   def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
     val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
-    recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
+    recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
   }
 
   /**
@@ -441,11 +453,12 @@ class ALSModel private[ml] (
       dstFactors: DataFrame,
       srcOutputColumn: String,
       dstOutputColumn: String,
-      num: Int): DataFrame = {
+      num: Int,
+      blockSize: Int): DataFrame = {
     import srcFactors.sparkSession.implicits._
 
-    val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
-    val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
+    val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
+    val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
     val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
       .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
       .flatMap { case (srcIter, dstIter) =>
@@ -483,11 +496,10 @@ class ALSModel private[ml] (
 
   /**
    * Blockifies factors to improve the efficiency of cross join
-   * TODO: SPARK-20443 - expose blockSize as a param?
    */
   private def blockify(
       factors: Dataset[(Int, Array[Float])],
-      blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
+      blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
     import factors.sparkSession.implicits._
     factors.mapPartitions(_.grouped(blockSize))
   }
@@ -655,6 +667,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
   def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
 
   /**
+   * Set block size for stacking input data in matrices.
+   * Default is 4096.
+   *
+   * @group expertSetParam
+   */
+  @Since("3.0.0")
+  def setBlockSize(value: Int): this.type = set(blockSize, value)
+
+  /**
    * Sets both numUserBlocks and numItemBlocks to the specific value.
    *
    * @group setParam
@@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
     instr.logDataset(dataset)
     instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
       itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
-      seed, intermediateStorageLevel, finalStorageLevel)
+      seed, intermediateStorageLevel, finalStorageLevel, blockSize)
 
     val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
       numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
@@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
       checkpointInterval = $(checkpointInterval), seed = $(seed))
     val userDF = userFactors.toDF("id", "features")
     val itemDF = itemFactors.toDF("id", "features")
-    val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
+    val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
+      .setParent(this)
     copyValues(model)
   }
 
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 5ab8e60..1436b78 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -2153,7 +2153,7 @@ class NaiveBayesModel(JavaProbabilisticClassificationModel, _NaiveBayesParams, J
 
 
 class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
-                                  HasTol, HasStepSize, HasSolver):
+                                  HasTol, HasStepSize, HasSolver, HasBlockSize):
     """
     Params for :py:class:`MultilayerPerceptronClassifier`.
 
@@ -2164,11 +2164,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
                    "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
                    "neurons and output layer of 10 neurons.",
                    typeConverter=TypeConverters.toListInt)
-    blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
-                      "matrices. Data is stacked within partitions. If block size is more than " +
-                      "remaining data in a partition then it is adjusted to the size of this " +
-                      "data. Recommended size is between 10 and 1000, default is 128.",
-                      typeConverter=TypeConverters.toInt)
     solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
                    "options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
     initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
@@ -2181,13 +2176,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
         """
         return self.getOrDefault(self.layers)
 
-    @since("1.6.0")
-    def getBlockSize(self):
-        """
-        Gets the value of blockSize or its default value.
-        """
-        return self.getOrDefault(self.blockSize)
-
     @since("2.0.0")
     def getInitialWeights(self):
         """
@@ -2211,11 +2199,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
     ...     (1.0, Vectors.dense([0.0, 1.0])),
     ...     (1.0, Vectors.dense([1.0, 0.0])),
     ...     (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
-    >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
+    >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
     >>> mlp.setMaxIter(100)
     MultilayerPerceptronClassifier...
     >>> mlp.getMaxIter()
     100
+    >>> mlp.getBlockSize()
+    128
+    >>> mlp.setBlockSize(1)
+    MultilayerPerceptronClassifier...
+    >>> mlp.getBlockSize()
+    1
     >>> model = mlp.fit(df)
     >>> model.setFeaturesCol("features")
     MultilayerPerceptronClassificationModel...
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index ded3ca8..2086e83 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -164,7 +164,10 @@ if __name__ == "__main__":
          "'euclidean'", "TypeConverters.toString"),
         ("validationIndicatorCol", "name of the column that indicates whether each row is for " +
          "training or for validation. False indicates training; true indicates validation.",
-         None, "TypeConverters.toString")]
+         None, "TypeConverters.toString"),
+        ("blockSize", "block size for stacking input data in matrices. Data is stacked within "
+         "partitions. If block size is more than remaining data in a partition then it is "
+         "adjusted to the size of this data.", None, "TypeConverters.toInt")]
 
     code = []
     for name, doc, defaultValueStr, typeConverter in shared:
diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py
index 8fc1156..24fb0d3 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -580,3 +580,20 @@ class HasValidationIndicatorCol(Params):
         Gets the value of validationIndicatorCol or its default value.
         """
         return self.getOrDefault(self.validationIndicatorCol)
+
+
+class HasBlockSize(Params):
+    """
+    Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
+    """
+
+    blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt)
+
+    def __init__(self):
+        super(HasBlockSize, self).__init__()
+
+    def getBlockSize(self):
+        """
+        Gets the value of blockSize or its default value.
+        """
+        return self.getOrDefault(self.blockSize)
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index ee27696..99d80aa 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -28,7 +28,7 @@ __all__ = ['ALS', 'ALSModel']
 
 
 @inherit_doc
-class _ALSModelParams(HasPredictionCol):
+class _ALSModelParams(HasPredictionCol, HasBlockSize):
     """
     Params for :py:class:`ALS` and :py:class:`ALSModel`.
 
@@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
     0.1
     >>> als.clear(als.regParam)
     >>> model = als.fit(df)
+    >>> model.getBlockSize()
+    4096
     >>> model.getUserCol()
     'user'
     >>> model.setUserCol("user")
@@ -282,13 +284,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                  implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
                  ratingCol="rating", nonnegative=False, checkpointInterval=10,
                  intermediateStorageLevel="MEMORY_AND_DISK",
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
         """
         __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
                  implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
                  ratingCol="rating", nonnegative=false, checkpointInterval=10, \
                  intermediateStorageLevel="MEMORY_AND_DISK", \
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
         """
         super(ALS, self).__init__()
         self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
@@ -296,7 +298,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                          implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
                          ratingCol="rating", nonnegative=False, checkpointInterval=10,
                          intermediateStorageLevel="MEMORY_AND_DISK",
-                         finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+                         finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
+                         blockSize=4096)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -306,13 +309,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                   implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
                   ratingCol="rating", nonnegative=False, checkpointInterval=10,
                   intermediateStorageLevel="MEMORY_AND_DISK",
-                  finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
+                  finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
         """
         setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
                  implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
                  ratingCol="rating", nonnegative=False, checkpointInterval=10, \
                  intermediateStorageLevel="MEMORY_AND_DISK", \
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
         Sets params for ALS.
         """
         kwargs = self._input_kwargs
@@ -443,6 +446,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
         """
         return self._set(seed=value)
 
+    @since("3.0.0")
+    def setBlockSize(self, value):
+        """
+        Sets the value of :py:attr:`blockSize`.
+        """
+        return self._set(blockSize=value)
+
 
 class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
     """
@@ -479,6 +489,13 @@ class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
         """
         return self._set(predictionCol=value)
 
+    @since("3.0.0")
+    def setBlockSize(self, value):
+        """
+        Sets the value of :py:attr:`blockSize`.
+        """
+        return self._set(blockSize=value)
+
     @property
     @since("1.4.0")
     def rank(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org