You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/06/24 23:58:43 UTC

spark git commit: [SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD

Repository: spark
Updated Branches:
  refs/heads/master f04b5672c -> fb32c3889


[SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD

Add Python bindings to StreamingLogisticRegressionwithSGD.

No Java wrappers are needed as models are updated directly using train.

Author: MechCoder <ma...@gmail.com>

Closes #6849 from MechCoder/spark-3258 and squashes the following commits:

b4376a5 [MechCoder] minor
d7e5fc1 [MechCoder] Refactor into StreamingLinearAlgorithm Better docs
9c09d4e [MechCoder] [SPARK-7633] Python bindings for StreamingLogisticRegressionwithSGD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb32c388
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb32c388
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb32c388

Branch: refs/heads/master
Commit: fb32c388985ce65c1083cb435cf1f7479fecbaac
Parents: f04b567
Author: MechCoder <ma...@gmail.com>
Authored: Wed Jun 24 14:58:43 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jun 24 14:58:43 2015 -0700

----------------------------------------------------------------------
 python/pyspark/mllib/classification.py |  96 +++++++++++++++++++-
 python/pyspark/mllib/tests.py          | 135 +++++++++++++++++++++++++++-
 2 files changed, 229 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb32c388/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 758accf..2698f10 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,6 +21,7 @@ import numpy
 from numpy import array
 
 from pyspark import RDD
+from pyspark.streaming import DStream
 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
 from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
 from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -28,7 +29,8 @@ from pyspark.mllib.util import Saveable, Loader, inherit_doc
 
 
 __all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
-           'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
+           'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
+           'StreamingLogisticRegressionWithSGD']
 
 
 class LinearClassificationModel(LinearModel):
@@ -583,6 +585,98 @@ class NaiveBayes(object):
         return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
 
 
+class StreamingLinearAlgorithm(object):
+    """
+    Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+    Prevents reimplementation of methods predictOn and predictOnValues.
+    """
+    def __init__(self, model):
+        self._model = model
+
+    def latestModel(self):
+        """
+        Returns the latest model.
+        """
+        return self._model
+
+    def _validate(self, dstream):
+        if not isinstance(dstream, DStream):
+            raise TypeError(
+                "dstream should be a DStream object, got %s" % type(dstream))
+        if not self._model:
+            raise ValueError(
+                "Model must be intialized using setInitialWeights")
+
+    def predictOn(self, dstream):
+        """
+        Make predictions on a dstream.
+
+        :return: Transformed dstream object.
+        """
+        self._validate(dstream)
+        return dstream.map(lambda x: self._model.predict(x))
+
+    def predictOnValues(self, dstream):
+        """
+        Make predictions on a keyed dstream.
+
+        :return: Transformed dstream object.
+        """
+        self._validate(dstream)
+        return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
+    """
+    Run LogisticRegression with SGD on a stream of data.
+
+    The weights obtained at the end of training a stream are used as initial
+    weights for the next stream.
+
+    :param stepSize: Step size for each iteration of gradient descent.
+    :param numIterations: Number of iterations run for each batch of data.
+    :param miniBatchFraction: Fraction of data on which SGD is run for each
+                              iteration.
+    :param regParam: L2 Regularization parameter.
+    """
+    def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
+        self.stepSize = stepSize
+        self.numIterations = numIterations
+        self.regParam = regParam
+        self.miniBatchFraction = miniBatchFraction
+        self._model = None
+        super(StreamingLogisticRegressionWithSGD, self).__init__(
+            model=self._model)
+
+    def setInitialWeights(self, initialWeights):
+        """
+        Set the initial value of weights.
+
+        This must be set before running trainOn and predictOn.
+        """
+        initialWeights = _convert_to_vector(initialWeights)
+
+        # LogisticRegressionWithSGD does only binary classification.
+        self._model = LogisticRegressionModel(
+            initialWeights, 0, initialWeights.size, 2)
+        return self
+
+    def trainOn(self, dstream):
+        """Train the model on the incoming dstream."""
+        self._validate(dstream)
+
+        def update(rdd):
+            # LogisticRegressionWithSGD.train raises an error for an empty RDD.
+            if not rdd.isEmpty():
+                self._model = LogisticRegressionWithSGD.train(
+                    rdd, self.numIterations, self.stepSize,
+                    self.miniBatchFraction, self._model.weights)
+
+        dstream.foreachRDD(update)
+
+
 def _test():
     import doctest
     from pyspark import SparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/fb32c388/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 509faa1..cd80c3e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -26,7 +26,8 @@ import array as pyarray
 from time import time, sleep
 from shutil import rmtree
 
-from numpy import array, array_equal, zeros, inf, all, random
+from numpy import (
+    array, array_equal, zeros, inf, random, exp, dot, all, mean)
 from numpy import sum as array_sum
 from py4j.protocol import Py4JJavaError
 
@@ -45,6 +46,7 @@ from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
     DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
 from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
 from pyspark.mllib.random import RandomRDDs
 from pyspark.mllib.stat import Statistics
 from pyspark.mllib.feature import Word2Vec
@@ -1037,6 +1039,137 @@ class LinearDataGeneratorTests(MLlibTestCase):
             self.assertEqual(len(point.features), 2)
 
 
+class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
+
+    @staticmethod
+    def generateLogisticInput(offset, scale, nPoints, seed):
+        """
+        Generate 1 / (1 + exp(-x * scale + offset))
+
+        where,
+        x is randomnly distributed and the threshold
+        and labels for each sample in x is obtained from a random uniform
+        distribution.
+        """
+        rng = random.RandomState(seed)
+        x = rng.randn(nPoints)
+        sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
+        y_p = rng.rand(nPoints)
+        cut_off = y_p <= sigmoid
+        y_p[cut_off] = 1.0
+        y_p[~cut_off] = 0.0
+        return [
+            LabeledPoint(y_p[i], Vectors.dense([x[i]]))
+            for i in range(nPoints)]
+
+    def test_parameter_accuracy(self):
+        """
+        Test that the final value of weights is close to the desired value.
+        """
+        input_batches = [
+            self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+            for i in range(20)]
+        input_stream = self.ssc.queueStream(input_batches)
+
+        slr = StreamingLogisticRegressionWithSGD(
+            stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([0.0])
+        slr.trainOn(input_stream)
+
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 20.0, 0.01)
+        rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
+        self.assertAlmostEqual(rel, 0.1, 1)
+
+    def test_convergence(self):
+        """
+        Test that weights converge to the required value on toy data.
+        """
+        input_batches = [
+            self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+            for i in range(20)]
+        input_stream = self.ssc.queueStream(input_batches)
+        models = []
+
+        slr = StreamingLogisticRegressionWithSGD(
+            stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([0.0])
+        slr.trainOn(input_stream)
+        input_stream.foreachRDD(
+            lambda x: models.append(slr.latestModel().weights[0]))
+
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 15.0, 0.01)
+        t_models = array(models)
+        diff = t_models[1:] - t_models[:-1]
+
+        # Test that weights improve with a small tolerance,
+        self.assertTrue(all(diff >= -0.1))
+        self.assertTrue(array_sum(diff > 0) > 1)
+
+    @staticmethod
+    def calculate_accuracy_error(true, predicted):
+        return sum(abs(array(true) - array(predicted))) / len(true)
+
+    def test_predictions(self):
+        """Test predicted values on a toy model."""
+        input_batches = []
+        for i in range(20):
+            batch = self.sc.parallelize(
+                self.generateLogisticInput(0, 1.5, 100, 42 + i))
+            input_batches.append(batch.map(lambda x: (x.label, x.features)))
+        input_stream = self.ssc.queueStream(input_batches)
+
+        slr = StreamingLogisticRegressionWithSGD(
+            stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([1.5])
+        predict_stream = slr.predictOnValues(input_stream)
+        true_predicted = []
+        predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 5.0, 0.01)
+
+        # Test that the accuracy error is no more than 0.4 on each batch.
+        for batch in true_predicted:
+            true, predicted = zip(*batch)
+            self.assertTrue(
+                self.calculate_accuracy_error(true, predicted) < 0.4)
+
+    def test_training_and_prediction(self):
+        """Test that the model improves on toy data with no. of batches"""
+        input_batches = [
+            self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+            for i in range(20)]
+        predict_batches = [
+            b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
+
+        slr = StreamingLogisticRegressionWithSGD(
+            stepSize=0.01, numIterations=25)
+        slr.setInitialWeights([-0.1])
+        errors = []
+
+        def collect_errors(rdd):
+            true, predicted = zip(*rdd.collect())
+            errors.append(self.calculate_accuracy_error(true, predicted))
+
+        true_predicted = []
+        input_stream = self.ssc.queueStream(input_batches)
+        predict_stream = self.ssc.queueStream(predict_batches)
+        slr.trainOn(input_stream)
+        ps = slr.predictOnValues(predict_stream)
+        ps.foreachRDD(lambda x: collect_errors(x))
+
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 20.0, 0.01)
+
+        # Test that the improvement in error is atleast 0.3
+        self.assertTrue(errors[1] - errors[-1] > 0.3)
+
+
 if __name__ == "__main__":
     if not _have_scipy:
         print("NOTE: Skipping SciPy tests as it does not seem to be installed")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org