You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2020/02/26 02:58:35 UTC
[spark] branch branch-3.0 updated: [SPARK-30662][ML][PYSPARK] Put
back the API changes for HasBlockSize in ALS/MLP
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 84345c7 [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP
84345c7 is described below
commit 84345c7e67c9dfd47ec76d5a3d2ad76b62f959b6
Author: Huaxin Gao <hu...@us.ibm.com>
AuthorDate: Sun Feb 9 13:14:30 2020 +0800
[SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP
### What changes were proposed in this pull request?
Add ```HasBlockSize``` in shared Params in both Scala and Python.
Make ALS/MLP extend ```HasBlockSize```
### Why are the changes needed?
Add ```HasBlockSize ``` in ALS, so user can specify the blockSize.
Make ```HasBlockSize``` a shared param so both ALS and MLP can use it.
### Does this PR introduce any user-facing change?
Yes
```ALS.setBlockSize/getBlockSize```
```ALSModel.setBlockSize/getBlockSize```
### How was this patch tested?
Manually tested. Also added doctest.
Closes #27501 from huaxingao/spark_30662.
Authored-by: Huaxin Gao <hu...@us.ibm.com>
Signed-off-by: zhengruifeng <ru...@foxmail.com>
---
.../MultilayerPerceptronClassifier.scala | 22 +----------
.../ml/param/shared/SharedParamsCodeGen.scala | 6 ++-
.../spark/ml/param/shared/sharedParams.scala | 17 ++++++++
.../org/apache/spark/ml/recommendation/ALS.scala | 46 ++++++++++++++++------
python/pyspark/ml/classification.py | 22 ++++-------
python/pyspark/ml/param/_shared_params_code_gen.py | 5 ++-
python/pyspark/ml/param/shared.py | 17 ++++++++
python/pyspark/ml/recommendation.py | 29 +++++++++++---
8 files changed, 109 insertions(+), 55 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index c7a8237..6e8f92b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
- with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
+ with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
import MultilayerPerceptronClassifier._
@@ -55,26 +55,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
final def getLayers: Array[Int] = $(layers)
/**
- * Block size for stacking input data in matrices to speed up the computation.
- * Data is stacked within partitions. If block size is more than remaining data in
- * a partition then it is adjusted to the size of this data.
- * Recommended size is between 10 and 1000.
- * Default: 128
- *
- * @group expertParam
- */
- @Since("1.5.0")
- final val blockSize: IntParam = new IntParam(this, "blockSize",
- "Block size for stacking input data in matrices. Data is stacked within partitions." +
- " If block size is more than remaining data in a partition then " +
- "it is adjusted to the size of this data. Recommended size is between 10 and 1000",
- ParamValidators.gt(0))
-
- /** @group expertGetParam */
- @Since("1.5.0")
- final def getBlockSize: Int = $(blockSize)
-
- /**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
* Default: "l-bfgs"
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 7ac680e..6194dfa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen {
isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"),
ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " +
"each row is for training or for validation. False indicates training; true indicates " +
- "validation.")
+ "validation."),
+ ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
+ "stacked within partitions. If block size is more than remaining data in a partition " +
+ "then it is adjusted to the size of this data.",
+ isValid = "ParamValidators.gt(0)", isExpertParam = true)
)
val code = genSharedParams(params)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 44c993e..0c0d2b5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -578,4 +578,21 @@ trait HasValidationIndicatorCol extends Params {
/** @group getParam */
final def getValidationIndicatorCol: String = $(validationIndicatorCol)
}
+
+/**
+ * Trait for shared param blockSize. This trait may be changed or
+ * removed between minor versions.
+ */
+@DeveloperApi
+trait HasBlockSize extends Params {
+
+ /**
+ * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..
+ * @group expertParam
+ */
+ final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0))
+
+ /** @group expertGetParam */
+ final def getBlockSize: Int = $(blockSize)
+}
// scalastyle:on
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 2fb9a27..002146f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
-private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
+private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
+ with HasBlockSize {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
@@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
/** @group expertGetParam */
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)
+
+ setDefault(blockSize -> 4096)
}
/**
@@ -288,6 +291,15 @@ class ALSModel private[ml] (
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
+ /**
+ * Set block size for stacking input data in matrices.
+ * Default is 4096.
+ *
+ * @group expertSetParam
+ */
+ @Since("3.0.0")
+ def setBlockSize(value: Int): this.type = set(blockSize, value)
+
private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
if (featuresA != null && featuresB != null) {
var dotProduct = 0.0f
@@ -351,7 +363,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllUsers(numItems: Int): DataFrame = {
- recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
+ recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
}
/**
@@ -366,7 +378,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
- recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
+ recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
}
/**
@@ -377,7 +389,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllItems(numUsers: Int): DataFrame = {
- recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
+ recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
}
/**
@@ -392,7 +404,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
- recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
+ recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
}
/**
@@ -441,11 +453,12 @@ class ALSModel private[ml] (
dstFactors: DataFrame,
srcOutputColumn: String,
dstOutputColumn: String,
- num: Int): DataFrame = {
+ num: Int,
+ blockSize: Int): DataFrame = {
import srcFactors.sparkSession.implicits._
- val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
- val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
+ val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
+ val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap { case (srcIter, dstIter) =>
@@ -483,11 +496,10 @@ class ALSModel private[ml] (
/**
* Blockifies factors to improve the efficiency of cross join
- * TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
factors: Dataset[(Int, Array[Float])],
- blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
+ blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
import factors.sparkSession.implicits._
factors.mapPartitions(_.grouped(blockSize))
}
@@ -655,6 +667,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
/**
+ * Set block size for stacking input data in matrices.
+ * Default is 4096.
+ *
+ * @group expertSetParam
+ */
+ @Since("3.0.0")
+ def setBlockSize(value: Int): this.type = set(blockSize, value)
+
+ /**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
* @group setParam
@@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
instr.logDataset(dataset)
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
- seed, intermediateStorageLevel, finalStorageLevel)
+ seed, intermediateStorageLevel, finalStorageLevel, blockSize)
val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
@@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
- val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
+ val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
+ .setParent(this)
copyValues(model)
}
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 5ab8e60..1436b78 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -2153,7 +2153,7 @@ class NaiveBayesModel(JavaProbabilisticClassificationModel, _NaiveBayesParams, J
class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
- HasTol, HasStepSize, HasSolver):
+ HasTol, HasStepSize, HasSolver, HasBlockSize):
"""
Params for :py:class:`MultilayerPerceptronClassifier`.
@@ -2164,11 +2164,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
"neurons and output layer of 10 neurons.",
typeConverter=TypeConverters.toListInt)
- blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
- "matrices. Data is stacked within partitions. If block size is more than " +
- "remaining data in a partition then it is adjusted to the size of this " +
- "data. Recommended size is between 10 and 1000, default is 128.",
- typeConverter=TypeConverters.toInt)
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
@@ -2181,13 +2176,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"""
return self.getOrDefault(self.layers)
- @since("1.6.0")
- def getBlockSize(self):
- """
- Gets the value of blockSize or its default value.
- """
- return self.getOrDefault(self.blockSize)
-
@since("2.0.0")
def getInitialWeights(self):
"""
@@ -2211,11 +2199,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
... (1.0, Vectors.dense([0.0, 1.0])),
... (1.0, Vectors.dense([1.0, 0.0])),
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
- >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
+ >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
>>> mlp.setMaxIter(100)
MultilayerPerceptronClassifier...
>>> mlp.getMaxIter()
100
+ >>> mlp.getBlockSize()
+ 128
+ >>> mlp.setBlockSize(1)
+ MultilayerPerceptronClassifier...
+ >>> mlp.getBlockSize()
+ 1
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index ded3ca8..2086e83 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -164,7 +164,10 @@ if __name__ == "__main__":
"'euclidean'", "TypeConverters.toString"),
("validationIndicatorCol", "name of the column that indicates whether each row is for " +
"training or for validation. False indicates training; true indicates validation.",
- None, "TypeConverters.toString")]
+ None, "TypeConverters.toString"),
+ ("blockSize", "block size for stacking input data in matrices. Data is stacked within "
+ "partitions. If block size is more than remaining data in a partition then it is "
+ "adjusted to the size of this data.", None, "TypeConverters.toInt")]
code = []
for name, doc, defaultValueStr, typeConverter in shared:
diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py
index 8fc1156..24fb0d3 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -580,3 +580,20 @@ class HasValidationIndicatorCol(Params):
Gets the value of validationIndicatorCol or its default value.
"""
return self.getOrDefault(self.validationIndicatorCol)
+
+
+class HasBlockSize(Params):
+ """
+ Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
+ """
+
+ blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt)
+
+ def __init__(self):
+ super(HasBlockSize, self).__init__()
+
+ def getBlockSize(self):
+ """
+ Gets the value of blockSize or its default value.
+ """
+ return self.getOrDefault(self.blockSize)
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index ee27696..99d80aa 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -28,7 +28,7 @@ __all__ = ['ALS', 'ALSModel']
@inherit_doc
-class _ALSModelParams(HasPredictionCol):
+class _ALSModelParams(HasPredictionCol, HasBlockSize):
"""
Params for :py:class:`ALS` and :py:class:`ALSModel`.
@@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
+ >>> model.getBlockSize()
+ 4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
@@ -282,13 +284,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
- finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
+ finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
- finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+ finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
"""
super(ALS, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
@@ -296,7 +298,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
- finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+ finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
+ blockSize=4096)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@@ -306,13 +309,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
- finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
+ finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
- finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
+ finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
Sets params for ALS.
"""
kwargs = self._input_kwargs
@@ -443,6 +446,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
"""
return self._set(seed=value)
+ @since("3.0.0")
+ def setBlockSize(self, value):
+ """
+ Sets the value of :py:attr:`blockSize`.
+ """
+ return self._set(blockSize=value)
+
class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
@@ -479,6 +489,13 @@ class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
return self._set(predictionCol=value)
+ @since("3.0.0")
+ def setBlockSize(self, value):
+ """
+ Sets the value of :py:attr:`blockSize`.
+ """
+ return self._set(blockSize=value)
+
@property
@since("1.4.0")
def rank(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org