You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/10/27 22:19:31 UTC

spark git commit: [SPARK-21911][ML][PYSPARK] Parallel Model Evaluation for ML Tuning in PySpark

Repository: spark
Updated Branches:
  refs/heads/master b3d8fc3dc -> 20eb95e5e


[SPARK-21911][ML][PYSPARK] Parallel Model Evaluation for ML Tuning in PySpark

## What changes were proposed in this pull request?

Add parallelism support for ML tuning in pyspark.

## How was this patch tested?

Test updated.

Author: WeichenXu <we...@databricks.com>

Closes #19122 from WeichenXu123/par-ml-tuning-py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20eb95e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20eb95e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20eb95e5

Branch: refs/heads/master
Commit: 20eb95e5e9c562261b44e4e47cad67a31390fa59
Parents: b3d8fc3
Author: WeichenXu <we...@databricks.com>
Authored: Fri Oct 27 15:19:27 2017 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Fri Oct 27 15:19:27 2017 -0700

----------------------------------------------------------------------
 .../spark/ml/tuning/CrossValidatorSuite.scala   |  4 +-
 .../ml/tuning/TrainValidationSplitSuite.scala   |  4 +-
 python/pyspark/ml/tests.py                      | 39 +++++++++
 python/pyspark/ml/tuning.py                     | 86 ++++++++++++--------
 4 files changed, 96 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
index a01744f..853eeb3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
@@ -137,8 +137,8 @@ class CrossValidatorSuite
     cv.setParallelism(2)
     val cvParallelModel = cv.fit(dataset)
 
-    val serialMetrics = cvSerialModel.avgMetrics.sorted
-    val parallelMetrics = cvParallelModel.avgMetrics.sorted
+    val serialMetrics = cvSerialModel.avgMetrics
+    val parallelMetrics = cvParallelModel.avgMetrics
     assert(serialMetrics === parallelMetrics)
 
     val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]

http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
index 2ed4fbb..f8d9c66 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
@@ -138,8 +138,8 @@ class TrainValidationSplitSuite
     cv.setParallelism(2)
     val cvParallelModel = cv.fit(dataset)
 
-    val serialMetrics = cvSerialModel.validationMetrics.sorted
-    val parallelMetrics = cvParallelModel.validationMetrics.sorted
+    val serialMetrics = cvSerialModel.validationMetrics
+    val parallelMetrics = cvParallelModel.validationMetrics
     assert(serialMetrics === parallelMetrics)
 
     val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]

http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 8b8bcc7..2f1f3af 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -836,6 +836,27 @@ class CrossValidatorTests(SparkSessionTestCase):
         loadedModel = CrossValidatorModel.load(cvModelPath)
         self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
 
+    def test_parallel_evaluation(self):
+        dataset = self.spark.createDataFrame(
+            [(Vectors.dense([0.0]), 0.0),
+             (Vectors.dense([0.4]), 1.0),
+             (Vectors.dense([0.5]), 0.0),
+             (Vectors.dense([0.6]), 1.0),
+             (Vectors.dense([1.0]), 1.0)] * 10,
+            ["features", "label"])
+
+        lr = LogisticRegression()
+        grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
+        evaluator = BinaryClassificationEvaluator()
+
+        # test save/load of CrossValidator
+        cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+        cv.setParallelism(1)
+        cvSerialModel = cv.fit(dataset)
+        cv.setParallelism(2)
+        cvParallelModel = cv.fit(dataset)
+        self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
+
     def test_save_load_nested_estimator(self):
         temp_path = tempfile.mkdtemp()
         dataset = self.spark.createDataFrame(
@@ -986,6 +1007,24 @@ class TrainValidationSplitTests(SparkSessionTestCase):
         loadedModel = TrainValidationSplitModel.load(tvsModelPath)
         self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
 
+    def test_parallel_evaluation(self):
+        dataset = self.spark.createDataFrame(
+            [(Vectors.dense([0.0]), 0.0),
+             (Vectors.dense([0.4]), 1.0),
+             (Vectors.dense([0.5]), 0.0),
+             (Vectors.dense([0.6]), 1.0),
+             (Vectors.dense([1.0]), 1.0)] * 10,
+            ["features", "label"])
+        lr = LogisticRegression()
+        grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
+        evaluator = BinaryClassificationEvaluator()
+        tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+        tvs.setParallelism(1)
+        tvsSerialModel = tvs.fit(dataset)
+        tvs.setParallelism(2)
+        tvsParallelModel = tvs.fit(dataset)
+        self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
+
     def test_save_load_nested_estimator(self):
         # This tests saving and loading the trained model only.
         # Save/load for TrainValidationSplit will be added later: SPARK-13786

http://git-wip-us.apache.org/repos/asf/spark/blob/20eb95e5/python/pyspark/ml/tuning.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 00c348a..4735113 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -14,15 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 import itertools
 import numpy as np
+from multiprocessing.pool import ThreadPool
 
 from pyspark import since, keyword_only
 from pyspark.ml import Estimator, Model
 from pyspark.ml.common import _py2java
 from pyspark.ml.param import Params, Param, TypeConverters
-from pyspark.ml.param.shared import HasSeed
+from pyspark.ml.param.shared import HasParallelism, HasSeed
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaParams
 from pyspark.sql.functions import rand
@@ -170,7 +170,7 @@ class ValidatorParams(HasSeed):
         return java_estimator, java_epms, java_evaluator
 
 
-class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
+class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
     """
 
     K-fold cross validation performs model selection by splitting the dataset into a set of
@@ -193,7 +193,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
     >>> lr = LogisticRegression()
     >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
     >>> evaluator = BinaryClassificationEvaluator()
-    >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+    >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
+    ...     parallelism=2)
     >>> cvModel = cv.fit(dataset)
     >>> cvModel.avgMetrics[0]
     0.5
@@ -208,23 +209,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
 
     @keyword_only
     def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
-                 seed=None):
+                 seed=None, parallelism=1):
         """
         __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
-                 seed=None)
+                 seed=None, parallelism=1)
         """
         super(CrossValidator, self).__init__()
-        self._setDefault(numFolds=3)
+        self._setDefault(numFolds=3, parallelism=1)
         kwargs = self._input_kwargs
         self._set(**kwargs)
 
     @keyword_only
     @since("1.4.0")
     def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
-                  seed=None):
+                  seed=None, parallelism=1):
         """
         setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
-                  seed=None):
+                  seed=None, parallelism=1):
         Sets params for cross validator.
         """
         kwargs = self._input_kwargs
@@ -255,18 +256,27 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
         randCol = self.uid + "_rand"
         df = dataset.select("*", rand(seed).alias(randCol))
         metrics = [0.0] * numModels
+
+        pool = ThreadPool(processes=min(self.getParallelism(), numModels))
+
         for i in range(nFolds):
             validateLB = i * h
             validateUB = (i + 1) * h
             condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
-            validation = df.filter(condition)
-            train = df.filter(~condition)
-            models = est.fit(train, epm)
-            for j in range(numModels):
-                model = models[j]
+            validation = df.filter(condition).cache()
+            train = df.filter(~condition).cache()
+
+            def singleTrain(paramMap):
+                model = est.fit(train, paramMap)
                 # TODO: duplicate evaluator to take extra params from input
-                metric = eva.evaluate(model.transform(validation, epm[j]))
-                metrics[j] += metric/nFolds
+                metric = eva.evaluate(model.transform(validation, paramMap))
+                return metric
+
+            currentFoldMetrics = pool.map(singleTrain, epm)
+            for j in range(numModels):
+                metrics[j] += (currentFoldMetrics[j] / nFolds)
+            validation.unpersist()
+            train.unpersist()
 
         if eva.isLargerBetter():
             bestIndex = np.argmax(metrics)
@@ -316,9 +326,10 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
         estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage)
         numFolds = java_stage.getNumFolds()
         seed = java_stage.getSeed()
+        parallelism = java_stage.getParallelism()
         # Create a new instance of this stage.
         py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
-                       numFolds=numFolds, seed=seed)
+                       numFolds=numFolds, seed=seed, parallelism=parallelism)
         py_stage._resetUid(java_stage.uid())
         return py_stage
 
@@ -337,6 +348,7 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
         _java_obj.setEstimator(estimator)
         _java_obj.setSeed(self.getSeed())
         _java_obj.setNumFolds(self.getNumFolds())
+        _java_obj.setParallelism(self.getParallelism())
 
         return _java_obj
 
@@ -427,7 +439,7 @@ class CrossValidatorModel(Model, ValidatorParams, MLReadable, MLWritable):
         return _java_obj
 
 
-class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
+class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
     """
     .. note:: Experimental
 
@@ -448,7 +460,8 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
     >>> lr = LogisticRegression()
     >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
     >>> evaluator = BinaryClassificationEvaluator()
-    >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+    >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
+    ...     parallelism=2)
     >>> tvsModel = tvs.fit(dataset)
     >>> evaluator.evaluate(tvsModel.transform(dataset))
     0.8333...
@@ -461,23 +474,23 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
 
     @keyword_only
     def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
-                 seed=None):
+                 parallelism=1, seed=None):
         """
         __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
-                 seed=None)
+                 parallelism=1, seed=None)
         """
         super(TrainValidationSplit, self).__init__()
-        self._setDefault(trainRatio=0.75)
+        self._setDefault(trainRatio=0.75, parallelism=1)
         kwargs = self._input_kwargs
         self._set(**kwargs)
 
     @since("2.0.0")
     @keyword_only
     def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
-                  seed=None):
+                  parallelism=1, seed=None):
         """
         setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
-                  seed=None):
+                  parallelism=1, seed=None):
         Sets params for the train validation split.
         """
         kwargs = self._input_kwargs
@@ -506,15 +519,20 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
         seed = self.getOrDefault(self.seed)
         randCol = self.uid + "_rand"
         df = dataset.select("*", rand(seed).alias(randCol))
-        metrics = [0.0] * numModels
         condition = (df[randCol] >= tRatio)
-        validation = df.filter(condition)
-        train = df.filter(~condition)
-        models = est.fit(train, epm)
-        for j in range(numModels):
-            model = models[j]
-            metric = eva.evaluate(model.transform(validation, epm[j]))
-            metrics[j] += metric
+        validation = df.filter(condition).cache()
+        train = df.filter(~condition).cache()
+
+        def singleTrain(paramMap):
+            model = est.fit(train, paramMap)
+            metric = eva.evaluate(model.transform(validation, paramMap))
+            return metric
+
+        pool = ThreadPool(processes=min(self.getParallelism(), numModels))
+        metrics = pool.map(singleTrain, epm)
+        train.unpersist()
+        validation.unpersist()
+
         if eva.isLargerBetter():
             bestIndex = np.argmax(metrics)
         else:
@@ -563,9 +581,10 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
         estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage)
         trainRatio = java_stage.getTrainRatio()
         seed = java_stage.getSeed()
+        parallelism = java_stage.getParallelism()
         # Create a new instance of this stage.
         py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
-                       trainRatio=trainRatio, seed=seed)
+                       trainRatio=trainRatio, seed=seed, parallelism=parallelism)
         py_stage._resetUid(java_stage.uid())
         return py_stage
 
@@ -584,6 +603,7 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
         _java_obj.setEstimator(estimator)
         _java_obj.setTrainRatio(self.getTrainRatio())
         _java_obj.setSeed(self.getSeed())
+        _java_obj.setParallelism(self.getParallelism())
 
         return _java_obj
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org