You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/03/03 18:50:07 UTC

spark git commit: [SPARK-12877][ML] Add train-validation-split to pyspark

Repository: spark
Updated Branches:
  refs/heads/master 9a48c656e -> 511d4929c


[SPARK-12877][ML] Add train-validation-split to pyspark

## What changes were proposed in this pull request?
The changes proposed were to add train-validation-split to pyspark.ml.tuning.

## How was the this patch tested?
This patch was tested through unit tests located in pyspark/ml/test.py.

This is my original work and I license it to Spark.

Author: JeremyNixon <jn...@gmail.com>

Closes #11335 from JeremyNixon/tvs_pyspark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/511d4929
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/511d4929
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/511d4929

Branch: refs/heads/master
Commit: 511d4929c87ebf364b96bd46890628f736eaa026
Parents: 9a48c65
Author: JeremyNixon <jn...@gmail.com>
Authored: Thu Mar 3 09:50:05 2016 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Mar 3 09:50:05 2016 -0800

----------------------------------------------------------------------
 python/pyspark/ml/tests.py  |  53 ++++++++++-
 python/pyspark/ml/tuning.py | 193 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 244 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/511d4929/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 5fcfa9e..8182fcf 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -45,7 +45,7 @@ from pyspark.ml.feature import *
 from pyspark.ml.param import Param, Params
 from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
 from pyspark.ml.regression import LinearRegression
-from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
+from pyspark.ml.tuning import *
 from pyspark.ml.util import keyword_only
 from pyspark.mllib.linalg import DenseVector
 from pyspark.sql import DataFrame, SQLContext, Row
@@ -423,6 +423,57 @@ class CrossValidatorTests(PySparkTestCase):
         self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
 
 
+class TrainValidationSplitTests(PySparkTestCase):
+
+    def test_fit_minimize_metric(self):
+        sqlContext = SQLContext(self.sc)
+        dataset = sqlContext.createDataFrame([
+            (10, 10.0),
+            (50, 50.0),
+            (100, 100.0),
+            (500, 500.0)] * 10,
+            ["feature", "label"])
+
+        iee = InducedErrorEstimator()
+        evaluator = RegressionEvaluator(metricName="rmse")
+
+        grid = (ParamGridBuilder()
+                .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+                .build())
+        tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+        tvsModel = tvs.fit(dataset)
+        bestModel = tvsModel.bestModel
+        bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+        self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+                         "Best model should have zero induced error")
+        self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
+
+    def test_fit_maximize_metric(self):
+        sqlContext = SQLContext(self.sc)
+        dataset = sqlContext.createDataFrame([
+            (10, 10.0),
+            (50, 50.0),
+            (100, 100.0),
+            (500, 500.0)] * 10,
+            ["feature", "label"])
+
+        iee = InducedErrorEstimator()
+        evaluator = RegressionEvaluator(metricName="r2")
+
+        grid = (ParamGridBuilder()
+                .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+                .build())
+        tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+        tvsModel = tvs.fit(dataset)
+        bestModel = tvsModel.bestModel
+        bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+        self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+                         "Best model should have zero induced error")
+        self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+
+
 class PersistenceTest(PySparkTestCase):
 
     def test_linear_regression(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/511d4929/python/pyspark/ml/tuning.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 0cbe97f..77af009 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -25,7 +25,8 @@ from pyspark.ml.param.shared import HasSeed
 from pyspark.ml.util import keyword_only
 from pyspark.sql.functions import rand
 
-__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel']
+__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
+           'TrainValidationSplitModel']
 
 
 class ParamGridBuilder(object):
@@ -288,6 +289,196 @@ class CrossValidatorModel(Model):
         return CrossValidatorModel(self.bestModel.copy(extra))
 
 
+class TrainValidationSplit(Estimator, HasSeed):
+    """
+    Train-Validation-Split.
+
+    >>> from pyspark.ml.classification import LogisticRegression
+    >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
+    >>> from pyspark.mllib.linalg import Vectors
+    >>> dataset = sqlContext.createDataFrame(
+    ...     [(Vectors.dense([0.0]), 0.0),
+    ...      (Vectors.dense([0.4]), 1.0),
+    ...      (Vectors.dense([0.5]), 0.0),
+    ...      (Vectors.dense([0.6]), 1.0),
+    ...      (Vectors.dense([1.0]), 1.0)] * 10,
+    ...     ["features", "label"])
+    >>> lr = LogisticRegression()
+    >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
+    >>> evaluator = BinaryClassificationEvaluator()
+    >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+    >>> tvsModel = tvs.fit(dataset)
+    >>> evaluator.evaluate(tvsModel.transform(dataset))
+    0.8333...
+
+    .. versionadded:: 2.0.0
+    """
+
+    estimator = Param(Params._dummy(), "estimator", "estimator to be tested")
+    estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps")
+    evaluator = Param(
+        Params._dummy(), "evaluator",
+        "evaluator used to select hyper-parameters that maximize the validated metric")
+    trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\
+     validation data. Must be between 0 and 1.")
+
+    @keyword_only
+    def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
+                 seed=None):
+        """
+        __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
+                 seed=None)
+        """
+        super(TrainValidationSplit, self).__init__()
+        self._setDefault(trainRatio=0.75)
+        kwargs = self.__init__._input_kwargs
+        self._set(**kwargs)
+
+    @since("2.0.0")
+    @keyword_only
+    def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
+                  seed=None):
+        """
+        setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
+                  seed=None):
+        Sets params for the train validation split.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set(**kwargs)
+
+    @since("2.0.0")
+    def setEstimator(self, value):
+        """
+        Sets the value of :py:attr:`estimator`.
+        """
+        self._paramMap[self.estimator] = value
+        return self
+
+    @since("2.0.0")
+    def getEstimator(self):
+        """
+        Gets the value of estimator or its default value.
+        """
+        return self.getOrDefault(self.estimator)
+
+    @since("2.0.0")
+    def setEstimatorParamMaps(self, value):
+        """
+        Sets the value of :py:attr:`estimatorParamMaps`.
+        """
+        self._paramMap[self.estimatorParamMaps] = value
+        return self
+
+    @since("2.0.0")
+    def getEstimatorParamMaps(self):
+        """
+        Gets the value of estimatorParamMaps or its default value.
+        """
+        return self.getOrDefault(self.estimatorParamMaps)
+
+    @since("2.0.0")
+    def setEvaluator(self, value):
+        """
+        Sets the value of :py:attr:`evaluator`.
+        """
+        self._paramMap[self.evaluator] = value
+        return self
+
+    @since("2.0.0")
+    def getEvaluator(self):
+        """
+        Gets the value of evaluator or its default value.
+        """
+        return self.getOrDefault(self.evaluator)
+
+    @since("2.0.0")
+    def setTrainRatio(self, value):
+        """
+        Sets the value of :py:attr:`trainRatio`.
+        """
+        self._paramMap[self.trainRatio] = value
+        return self
+
+    @since("2.0.0")
+    def getTrainRatio(self):
+        """
+        Gets the value of trainRatio or its default value.
+        """
+        return self.getOrDefault(self.trainRatio)
+
+    def _fit(self, dataset):
+        est = self.getOrDefault(self.estimator)
+        epm = self.getOrDefault(self.estimatorParamMaps)
+        numModels = len(epm)
+        eva = self.getOrDefault(self.evaluator)
+        tRatio = self.getOrDefault(self.trainRatio)
+        seed = self.getOrDefault(self.seed)
+        randCol = self.uid + "_rand"
+        df = dataset.select("*", rand(seed).alias(randCol))
+        metrics = np.zeros(numModels)
+        condition = (df[randCol] >= tRatio)
+        validation = df.filter(condition)
+        train = df.filter(~condition)
+        for j in range(numModels):
+            model = est.fit(train, epm[j])
+            metric = eva.evaluate(model.transform(validation, epm[j]))
+            metrics[j] += metric
+        if eva.isLargerBetter():
+            bestIndex = np.argmax(metrics)
+        else:
+            bestIndex = np.argmin(metrics)
+        bestModel = est.fit(dataset, epm[bestIndex])
+        return TrainValidationSplitModel(bestModel)
+
+    @since("2.0.0")
+    def copy(self, extra=None):
+        """
+        Creates a copy of this instance with a randomly generated uid
+        and some extra params. This copies creates a deep copy of
+        the embedded paramMap, and copies the embedded and extra parameters over.
+
+        :param extra: Extra parameters to copy to the new instance
+        :return: Copy of this instance
+        """
+        if extra is None:
+            extra = dict()
+        newTVS = Params.copy(self, extra)
+        if self.isSet(self.estimator):
+            newTVS.setEstimator(self.getEstimator().copy(extra))
+        # estimatorParamMaps remain the same
+        if self.isSet(self.evaluator):
+            newTVS.setEvaluator(self.getEvaluator().copy(extra))
+        return newTVS
+
+
+class TrainValidationSplitModel(Model):
+    """
+    Model from train validation split.
+    """
+
+    def __init__(self, bestModel):
+        super(TrainValidationSplitModel, self).__init__()
+        #: best model from cross validation
+        self.bestModel = bestModel
+
+    def _transform(self, dataset):
+        return self.bestModel.transform(dataset)
+
+    @since("2.0.0")
+    def copy(self, extra=None):
+        """
+        Creates a copy of this instance with a randomly generated uid
+        and some extra params. This copies the underlying bestModel,
+        creates a deep copy of the embedded paramMap, and
+        copies the embedded and extra parameters over.
+
+        :param extra: Extra parameters to copy to the new instance
+        :return: Copy of this instance
+        """
+        if extra is None:
+            extra = dict()
+        return TrainValidationSplitModel(self.bestModel.copy(extra))
+
 if __name__ == "__main__":
     import doctest
     from pyspark.context import SparkContext


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org