You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/10/27 22:19:31 UTC
spark git commit: [SPARK-21911][ML][PYSPARK] Parallel Model
Evaluation for ML Tuning in PySpark
Repository: spark
Updated Branches:
refs/heads/master b3d8fc3dc -> 20eb95e5e
[SPARK-21911][ML][PYSPARK] Parallel Model Evaluation for ML Tuning in PySpark
## What changes were proposed in this pull request?
Add parallelism support for ML tuning in pyspark.
## How was this patch tested?
Test updated.
Author: WeichenXu <we...@databricks.com>
Closes #19122 from WeichenXu123/par-ml-tuning-py.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20eb95e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20eb95e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20eb95e5
Branch: refs/heads/master
Commit: 20eb95e5e9c562261b44e4e47cad67a31390fa59
Parents: b3d8fc3
Author: WeichenXu <we...@databricks.com>
Authored: Fri Oct 27 15:19:27 2017 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Fri Oct 27 15:19:27 2017 -0700
----------------------------------------------------------------------
.../spark/ml/tuning/CrossValidatorSuite.scala | 4 +-
.../ml/tuning/TrainValidationSplitSuite.scala | 4 +-
python/pyspark/ml/tests.py | 39 +++++++++
python/pyspark/ml/tuning.py | 86 ++++++++++++--------
4 files changed, 96 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
index a01744f..853eeb3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
@@ -137,8 +137,8 @@ class CrossValidatorSuite
cv.setParallelism(2)
val cvParallelModel = cv.fit(dataset)
- val serialMetrics = cvSerialModel.avgMetrics.sorted
- val parallelMetrics = cvParallelModel.avgMetrics.sorted
+ val serialMetrics = cvSerialModel.avgMetrics
+ val parallelMetrics = cvParallelModel.avgMetrics
assert(serialMetrics === parallelMetrics)
val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
index 2ed4fbb..f8d9c66 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
@@ -138,8 +138,8 @@ class TrainValidationSplitSuite
cv.setParallelism(2)
val cvParallelModel = cv.fit(dataset)
- val serialMetrics = cvSerialModel.validationMetrics.sorted
- val parallelMetrics = cvParallelModel.validationMetrics.sorted
+ val serialMetrics = cvSerialModel.validationMetrics
+ val parallelMetrics = cvParallelModel.validationMetrics
assert(serialMetrics === parallelMetrics)
val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 8b8bcc7..2f1f3af 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -836,6 +836,27 @@ class CrossValidatorTests(SparkSessionTestCase):
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
+ def test_parallel_evaluation(self):
+ dataset = self.spark.createDataFrame(
+ [(Vectors.dense([0.0]), 0.0),
+ (Vectors.dense([0.4]), 1.0),
+ (Vectors.dense([0.5]), 0.0),
+ (Vectors.dense([0.6]), 1.0),
+ (Vectors.dense([1.0]), 1.0)] * 10,
+ ["features", "label"])
+
+ lr = LogisticRegression()
+ grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
+ evaluator = BinaryClassificationEvaluator()
+
+ # test save/load of CrossValidator
+ cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+ cv.setParallelism(1)
+ cvSerialModel = cv.fit(dataset)
+ cv.setParallelism(2)
+ cvParallelModel = cv.fit(dataset)
+ self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
+
def test_save_load_nested_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
@@ -986,6 +1007,24 @@ class TrainValidationSplitTests(SparkSessionTestCase):
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
+ def test_parallel_evaluation(self):
+ dataset = self.spark.createDataFrame(
+ [(Vectors.dense([0.0]), 0.0),
+ (Vectors.dense([0.4]), 1.0),
+ (Vectors.dense([0.5]), 0.0),
+ (Vectors.dense([0.6]), 1.0),
+ (Vectors.dense([1.0]), 1.0)] * 10,
+ ["features", "label"])
+ lr = LogisticRegression()
+ grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
+ evaluator = BinaryClassificationEvaluator()
+ tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+ tvs.setParallelism(1)
+ tvsSerialModel = tvs.fit(dataset)
+ tvs.setParallelism(2)
+ tvsParallelModel = tvs.fit(dataset)
+ self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
+
def test_save_load_nested_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/python/pyspark/ml/tuning.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 00c348a..4735113 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
import itertools
import numpy as np
+from multiprocessing.pool import ThreadPool
from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.common import _py2java
from pyspark.ml.param import Params, Param, TypeConverters
-from pyspark.ml.param.shared import HasSeed
+from pyspark.ml.param.shared import HasParallelism, HasSeed
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaParams
from pyspark.sql.functions import rand
@@ -170,7 +170,7 @@ class ValidatorParams(HasSeed):
return java_estimator, java_epms, java_evaluator
-class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
+class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
"""
K-fold cross validation performs model selection by splitting the dataset into a set of
@@ -193,7 +193,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
>>> lr = LogisticRegression()
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
>>> evaluator = BinaryClassificationEvaluator()
- >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+ >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
+ ... parallelism=2)
>>> cvModel = cv.fit(dataset)
>>> cvModel.avgMetrics[0]
0.5
@@ -208,23 +209,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
- seed=None):
+ seed=None, parallelism=1):
"""
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
- seed=None)
+ seed=None, parallelism=1)
"""
super(CrossValidator, self).__init__()
- self._setDefault(numFolds=3)
+ self._setDefault(numFolds=3, parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
@keyword_only
@since("1.4.0")
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
- seed=None):
+ seed=None, parallelism=1):
"""
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
- seed=None):
+ seed=None, parallelism=1):
Sets params for cross validator.
"""
kwargs = self._input_kwargs
@@ -255,18 +256,27 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = [0.0] * numModels
+
+ pool = ThreadPool(processes=min(self.getParallelism(), numModels))
+
for i in range(nFolds):
validateLB = i * h
validateUB = (i + 1) * h
condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
- validation = df.filter(condition)
- train = df.filter(~condition)
- models = est.fit(train, epm)
- for j in range(numModels):
- model = models[j]
+ validation = df.filter(condition).cache()
+ train = df.filter(~condition).cache()
+
+ def singleTrain(paramMap):
+ model = est.fit(train, paramMap)
# TODO: duplicate evaluator to take extra params from input
- metric = eva.evaluate(model.transform(validation, epm[j]))
- metrics[j] += metric/nFolds
+ metric = eva.evaluate(model.transform(validation, paramMap))
+ return metric
+
+ currentFoldMetrics = pool.map(singleTrain, epm)
+ for j in range(numModels):
+ metrics[j] += (currentFoldMetrics[j] / nFolds)
+ validation.unpersist()
+ train.unpersist()
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
@@ -316,9 +326,10 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage)
numFolds = java_stage.getNumFolds()
seed = java_stage.getSeed()
+ parallelism = java_stage.getParallelism()
# Create a new instance of this stage.
py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
- numFolds=numFolds, seed=seed)
+ numFolds=numFolds, seed=seed, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage
@@ -337,6 +348,7 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
_java_obj.setEstimator(estimator)
_java_obj.setSeed(self.getSeed())
_java_obj.setNumFolds(self.getNumFolds())
+ _java_obj.setParallelism(self.getParallelism())
return _java_obj
@@ -427,7 +439,7 @@ class CrossValidatorModel(Model, ValidatorParams, MLReadable, MLWritable):
return _java_obj
-class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
+class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
"""
.. note:: Experimental
@@ -448,7 +460,8 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
>>> lr = LogisticRegression()
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
>>> evaluator = BinaryClassificationEvaluator()
- >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+ >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
+ ... parallelism=2)
>>> tvsModel = tvs.fit(dataset)
>>> evaluator.evaluate(tvsModel.transform(dataset))
0.8333...
@@ -461,23 +474,23 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
- seed=None):
+ parallelism=1, seed=None):
"""
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
- seed=None)
+ parallelism=1, seed=None)
"""
super(TrainValidationSplit, self).__init__()
- self._setDefault(trainRatio=0.75)
+ self._setDefault(trainRatio=0.75, parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
@since("2.0.0")
@keyword_only
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
- seed=None):
+ parallelism=1, seed=None):
"""
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
- seed=None):
+ parallelism=1, seed=None):
Sets params for the train validation split.
"""
kwargs = self._input_kwargs
@@ -506,15 +519,20 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
seed = self.getOrDefault(self.seed)
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
- metrics = [0.0] * numModels
condition = (df[randCol] >= tRatio)
- validation = df.filter(condition)
- train = df.filter(~condition)
- models = est.fit(train, epm)
- for j in range(numModels):
- model = models[j]
- metric = eva.evaluate(model.transform(validation, epm[j]))
- metrics[j] += metric
+ validation = df.filter(condition).cache()
+ train = df.filter(~condition).cache()
+
+ def singleTrain(paramMap):
+ model = est.fit(train, paramMap)
+ metric = eva.evaluate(model.transform(validation, paramMap))
+ return metric
+
+ pool = ThreadPool(processes=min(self.getParallelism(), numModels))
+ metrics = pool.map(singleTrain, epm)
+ train.unpersist()
+ validation.unpersist()
+
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
@@ -563,9 +581,10 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage)
trainRatio = java_stage.getTrainRatio()
seed = java_stage.getSeed()
+ parallelism = java_stage.getParallelism()
# Create a new instance of this stage.
py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
- trainRatio=trainRatio, seed=seed)
+ trainRatio=trainRatio, seed=seed, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage
@@ -584,6 +603,7 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
_java_obj.setEstimator(estimator)
_java_obj.setTrainRatio(self.getTrainRatio())
_java_obj.setSeed(self.getSeed())
+ _java_obj.setParallelism(self.getParallelism())
return _java_obj
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org