You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/06/19 21:23:26 UTC

spark git commit: [SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans

Repository: spark
Updated Branches:
  refs/heads/master e41e2fd6c -> 54976e55e


[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans

Python bindings for StreamingKMeans

Will change status to MRG once docs, tests and examples are updated.

Author: MechCoder <ma...@gmail.com>

Closes #6499 from MechCoder/spark-4118 and squashes the following commits:

7722d16 [MechCoder] minor style fixes
51052d3 [MechCoder] Doc fixes
2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes
81482fd [MechCoder] minor
5d9fe61 [MechCoder] predictOn should take into account the latest model
8ab9e89 [MechCoder] Fix Python3 error
a9817df [MechCoder] Better tests and minor fixes
c80e451 [MechCoder] Add ignore_unicode_prefix
ee8ce16 [MechCoder] Update tests, doc and examples
4b1481f [MechCoder] Some changes and tests
d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54976e55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54976e55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54976e55

Branch: refs/heads/master
Commit: 54976e55e36465108b71b40b8a431be9d6d703ce
Parents: e41e2fd
Author: MechCoder <ma...@gmail.com>
Authored: Fri Jun 19 12:23:15 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Fri Jun 19 12:23:15 2015 -0700

----------------------------------------------------------------------
 docs/mllib-clustering.md                        |  48 ++++-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  15 ++
 python/pyspark/mllib/clustering.py              | 207 ++++++++++++++++++-
 python/pyspark/mllib/tests.py                   | 150 +++++++++++++-
 4 files changed, 411 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54976e55/docs/mllib-clustering.md
----------------------------------------------------------------------
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 1b08896..dcaa378 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -593,6 +593,50 @@ ssc.start()
 ssc.awaitTermination()
 
 {% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+First we import the neccessary classes.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.clustering import StreamingKMeans
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as a stream of labeled data
+points for testing. We assume a StreamingContext `ssc` has been created, see
+[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
+
+{% highlight python %}
+def parse(lp):
+    label = float(lp[lp.find('(') + 1: lp.find(',')])
+    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+    return LabeledPoint(label, vec)
+
+trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+testData = ssc.textFileStream("/testing/data/dir").map(parse)
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters to find
+
+{% highlight python %}
+model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job, printing
+the predicted cluster assignments on new data points as they arrive.
+
+{% highlight python %}
+model.trainOn(trainingData)
+print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
+
+ssc.start()
+ssc.awaitTermination()
+{% endhighlight %}
+</div>
+
+</div>
 
 As you add new text files with data the cluster centers will update. Each training
 point should be formatted as `[x1, x2, x3]`, and each test data point
@@ -600,7 +644,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id
 (e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
 the model will update. Anytime a text file is placed in `/testing/data/dir`
 you will see predictions. With new data, the cluster centers will change!
-
-</div>
-
-</div>

http://git-wip-us.apache.org/repos/asf/spark/blob/54976e55/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 1812b3a..2897865 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -964,6 +964,21 @@ private[python] class PythonMLLibAPI extends Serializable {
       points.asScala.toArray)
   }
 
+  /**
+   * Java stub for the update method of StreamingKMeansModel.
+   */
+  def updateStreamingKMeansModel(
+      clusterCenters: JList[Vector],
+      clusterWeights: JList[Double],
+      data: JavaRDD[Vector],
+      decayFactor: Double,
+      timeUnit: String): JList[Object] = {
+    val model = new StreamingKMeansModel(
+      clusterCenters.asScala.toArray, clusterWeights.asScala.toArray)
+        .update(data, decayFactor, timeUnit)
+      List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/54976e55/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index b55583f..c382298 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -21,16 +21,20 @@ import array as pyarray
 if sys.version > '3':
     xrange = range
 
-from numpy import array
+from math import exp, log
+
+from numpy import array, random, tile
 
-from pyspark import RDD
 from pyspark import SparkContext
+from pyspark.rdd import RDD, ignore_unicode_prefix
 from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
 from pyspark.mllib.stat.distribution import MultivariateGaussian
 from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.streaming import DStream
 
-__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']
+__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+           'StreamingKMeans', 'StreamingKMeansModel']
 
 
 @inherit_doc
@@ -98,6 +102,9 @@ class KMeansModel(Saveable, Loader):
         """Find the cluster to which x belongs in this model."""
         best = 0
         best_distance = float("inf")
+        if isinstance(x, RDD):
+            return x.map(self.predict)
+
         x = _convert_to_vector(x)
         for i in xrange(len(self.centers)):
             distance = x.squared_distance(self.centers[i])
@@ -264,6 +271,198 @@ class GaussianMixture(object):
         return GaussianMixtureModel(weight, mvg_obj)
 
 
+class StreamingKMeansModel(KMeansModel):
+    """
+    .. note:: Experimental
+
+    Clustering model which can perform an online update of the centroids.
+
+    The update formula for each centroid is given by
+
+    * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
+    * n_t+1 = n_t * a + m_t
+
+    where
+
+    * c_t: Centroid at the n_th iteration.
+    * n_t: Number of samples (or) weights associated with the centroid
+           at the n_th iteration.
+    * x_t: Centroid of the new data closest to c_t.
+    * m_t: Number of samples (or) weights of the new data closest to c_t
+    * c_t+1: New centroid.
+    * n_t+1: New number of weights.
+    * a: Decay Factor, which gives the forgetfulness.
+
+    Note that if a is set to 1, it is the weighted mean of the previous
+    and new data. If it set to zero, the old centroids are completely
+    forgotten.
+
+    :param clusterCenters: Initial cluster centers.
+    :param clusterWeights: List of weights assigned to each cluster.
+
+    >>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
+    >>> initWeights = [1.0, 1.0]
+    >>> stkm = StreamingKMeansModel(initCenters, initWeights)
+    >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
+    ...                        [0.9, 0.9], [1.1, 1.1]])
+    >>> stkm = stkm.update(data, 1.0, u"batches")
+    >>> stkm.centers
+    array([[ 0.,  0.],
+           [ 1.,  1.]])
+    >>> stkm.predict([-0.1, -0.1])
+    0
+    >>> stkm.predict([0.9, 0.9])
+    1
+    >>> stkm.clusterWeights
+    [3.0, 3.0]
+    >>> decayFactor = 0.0
+    >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
+    >>> stkm = stkm.update(data, 0.0, u"batches")
+    >>> stkm.centers
+    array([[ 0.2,  0.2],
+           [ 1.5,  1.5]])
+    >>> stkm.clusterWeights
+    [1.0, 1.0]
+    >>> stkm.predict([0.2, 0.2])
+    0
+    >>> stkm.predict([1.5, 1.5])
+    1
+    """
+    def __init__(self, clusterCenters, clusterWeights):
+        super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
+        self._clusterWeights = list(clusterWeights)
+
+    @property
+    def clusterWeights(self):
+        """Return the cluster weights."""
+        return self._clusterWeights
+
+    @ignore_unicode_prefix
+    def update(self, data, decayFactor, timeUnit):
+        """Update the centroids, according to data
+
+        :param data: Should be a RDD that represents the new data.
+        :param decayFactor: forgetfulness of the previous centroids.
+        :param timeUnit: Can be "batches" or "points". If points, then the
+                         decay factor is raised to the power of number of new
+                         points and if batches, it is used as it is.
+        """
+        if not isinstance(data, RDD):
+            raise TypeError("Data should be of an RDD, got %s." % type(data))
+        data = data.map(_convert_to_vector)
+        decayFactor = float(decayFactor)
+        if timeUnit not in ["batches", "points"]:
+            raise ValueError(
+                "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+        vectorCenters = [_convert_to_vector(center) for center in self.centers]
+        updatedModel = callMLlibFunc(
+            "updateStreamingKMeansModel", vectorCenters, self._clusterWeights,
+            data, decayFactor, timeUnit)
+        self.centers = array(updatedModel[0])
+        self._clusterWeights = list(updatedModel[1])
+        return self
+
+
+class StreamingKMeans(object):
+    """
+    .. note:: Experimental
+
+    Provides methods to set k, decayFactor, timeUnit to configure the
+    KMeans algorithm for fitting and predicting on incoming dstreams.
+    More details on how the centroids are updated are provided under the
+    docs of StreamingKMeansModel.
+
+    :param k: int, number of clusters
+    :param decayFactor: float, forgetfulness of the previous centroids.
+    :param timeUnit: can be "batches" or "points". If points, then the
+                     decayfactor is raised to the power of no. of new points.
+    """
+    def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
+        self._k = k
+        self._decayFactor = decayFactor
+        if timeUnit not in ["batches", "points"]:
+            raise ValueError(
+                "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+        self._timeUnit = timeUnit
+        self._model = None
+
+    def latestModel(self):
+        """Return the latest model"""
+        return self._model
+
+    def _validate(self, dstream):
+        if self._model is None:
+            raise ValueError(
+                "Initial centers should be set either by setInitialCenters "
+                "or setRandomCenters.")
+        if not isinstance(dstream, DStream):
+            raise TypeError(
+                "Expected dstream to be of type DStream, "
+                "got type %s" % type(dstream))
+
+    def setK(self, k):
+        """Set number of clusters."""
+        self._k = k
+        return self
+
+    def setDecayFactor(self, decayFactor):
+        """Set decay factor."""
+        self._decayFactor = decayFactor
+        return self
+
+    def setHalfLife(self, halfLife, timeUnit):
+        """
+        Set number of batches after which the centroids of that
+        particular batch has half the weightage.
+        """
+        self._timeUnit = timeUnit
+        self._decayFactor = exp(log(0.5) / halfLife)
+        return self
+
+    def setInitialCenters(self, centers, weights):
+        """
+        Set initial centers. Should be set before calling trainOn.
+        """
+        self._model = StreamingKMeansModel(centers, weights)
+        return self
+
+    def setRandomCenters(self, dim, weight, seed):
+        """
+        Set the initial centres to be random samples from
+        a gaussian population with constant weights.
+        """
+        rng = random.RandomState(seed)
+        clusterCenters = rng.randn(self._k, dim)
+        clusterWeights = tile(weight, self._k)
+        self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
+        return self
+
+    def trainOn(self, dstream):
+        """Train the model on the incoming dstream."""
+        self._validate(dstream)
+
+        def update(rdd):
+            self._model.update(rdd, self._decayFactor, self._timeUnit)
+
+        dstream.foreachRDD(update)
+
+    def predictOn(self, dstream):
+        """
+        Make predictions on a dstream.
+        Returns a transformed dstream object
+        """
+        self._validate(dstream)
+        return dstream.map(lambda x: self._model.predict(x))
+
+    def predictOnValues(self, dstream):
+        """
+        Make predictions on a keyed dstream.
+        Returns a transformed dstream object.
+        """
+        self._validate(dstream)
+        return dstream.mapValues(lambda x: self._model.predict(x))
+
+
 def _test():
     import doctest
     globs = globals().copy()

http://git-wip-us.apache.org/repos/asf/spark/blob/54976e55/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c482e6b..744dc11 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -23,8 +23,10 @@ import os
 import sys
 import tempfile
 import array as pyarray
+from time import time, sleep
 
-from numpy import array, array_equal, zeros, inf
+from numpy import array, array_equal, zeros, inf, all, random
+from numpy import sum as array_sum
 from py4j.protocol import Py4JJavaError
 
 if sys.version_info[:2] <= (2, 6):
@@ -38,6 +40,7 @@ else:
 
 from pyspark import SparkContext
 from pyspark.mllib.common import _to_java_object_rdd
+from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
     DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
 from pyspark.mllib.regression import LabeledPoint
@@ -48,6 +51,7 @@ from pyspark.mllib.feature import IDF
 from pyspark.mllib.feature import StandardScaler
 from pyspark.mllib.feature import ElementwiseProduct
 from pyspark.serializers import PickleSerializer
+from pyspark.streaming import StreamingContext
 from pyspark.sql import SQLContext
 
 _have_scipy = False
@@ -67,6 +71,20 @@ class MLlibTestCase(unittest.TestCase):
         self.sc = sc
 
 
+class MLLibStreamingTestCase(unittest.TestCase):
+    def setUp(self):
+        self.sc = sc
+        self.ssc = StreamingContext(self.sc, 1.0)
+
+    def tearDown(self):
+        self.ssc.stop(False)
+
+    @staticmethod
+    def _ssc_wait(start_time, end_time, sleep_time):
+        while time() - start_time < end_time:
+            sleep(0.01)
+
+
 def _squared_distance(a, b):
     if isinstance(a, Vector):
         return a.squared_distance(b)
@@ -863,6 +881,136 @@ class ElementwiseProductTests(MLlibTestCase):
             eprod.transform(sparsevec), SparseVector(3, [0], [3]))
 
 
+class StreamingKMeansTest(MLLibStreamingTestCase):
+    def test_model_params(self):
+        """Test that the model params are set correctly"""
+        stkm = StreamingKMeans()
+        stkm.setK(5).setDecayFactor(0.0)
+        self.assertEquals(stkm._k, 5)
+        self.assertEquals(stkm._decayFactor, 0.0)
+
+        # Model not set yet.
+        self.assertIsNone(stkm.latestModel())
+        self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
+
+        stkm.setInitialCenters(
+            centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
+        self.assertEquals(
+            stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
+        self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0])
+
+    def test_accuracy_for_single_center(self):
+        """Test that parameters obtained are correct for a single center."""
+        centers, batches = self.streamingKMeansDataGenerator(
+            batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
+        stkm = StreamingKMeans(1)
+        stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
+        input_stream = self.ssc.queueStream(
+            [self.sc.parallelize(batch, 1) for batch in batches])
+        stkm.trainOn(input_stream)
+
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 10.0, 0.01)
+        self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+        realCenters = array_sum(array(centers), axis=0)
+        for i in range(5):
+            modelCenters = stkm.latestModel().centers[0][i]
+            self.assertAlmostEqual(centers[0][i], modelCenters, 1)
+            self.assertAlmostEqual(realCenters[i], modelCenters, 1)
+
+    def streamingKMeansDataGenerator(self, batches, numPoints,
+                                     k, d, r, seed, centers=None):
+        rng = random.RandomState(seed)
+
+        # Generate centers.
+        centers = [rng.randn(d) for i in range(k)]
+
+        return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
+                          for j in range(numPoints)]
+                         for i in range(batches)]
+
+    def test_trainOn_model(self):
+        """Test the model on toy data with four clusters."""
+        stkm = StreamingKMeans()
+        initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
+        stkm.setInitialCenters(
+            centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
+
+        # Create a toy dataset by setting a tiny offest for each point.
+        offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
+        batches = []
+        for offset in offsets:
+            batches.append([[offset[0] + center[0], offset[1] + center[1]]
+                            for center in initCenters])
+
+        batches = [self.sc.parallelize(batch, 1) for batch in batches]
+        input_stream = self.ssc.queueStream(batches)
+        stkm.trainOn(input_stream)
+        t = time()
+        self.ssc.start()
+
+        # Give enough time to train the model.
+        self._ssc_wait(t, 6.0, 0.01)
+        finalModel = stkm.latestModel()
+        self.assertTrue(all(finalModel.centers == array(initCenters)))
+        self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+
+    def test_predictOn_model(self):
+        """Test that the model predicts correctly on toy data."""
+        stkm = StreamingKMeans()
+        stkm._model = StreamingKMeansModel(
+            clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
+            clusterWeights=[1.0, 1.0, 1.0, 1.0])
+
+        predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
+        predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
+        predict_stream = self.ssc.queueStream(predict_data)
+        predict_val = stkm.predictOn(predict_stream)
+
+        result = []
+
+        def update(rdd):
+            rdd_collect = rdd.collect()
+            if rdd_collect:
+                result.append(rdd_collect)
+
+        predict_val.foreachRDD(update)
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 6.0, 0.01)
+        self.assertEquals(result, [[0], [1], [2], [3]])
+
+    def test_trainOn_predictOn(self):
+        """Test that prediction happens on the updated model."""
+        stkm = StreamingKMeans(decayFactor=0.0, k=2)
+        stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
+
+        # Since decay factor is set to zero, once the first batch
+        # is passed the clusterCenters are updated to [-0.5, 0.7]
+        # which causes 0.2 & 0.3 to be classified as 1, even though the
+        # classification based in the initial model would have been 0
+        # proving that the model is updated.
+        batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
+        batches = [sc.parallelize(batch) for batch in batches]
+        input_stream = self.ssc.queueStream(batches)
+        predict_results = []
+
+        def collect(rdd):
+            rdd_collect = rdd.collect()
+            if rdd_collect:
+                predict_results.append(rdd_collect)
+
+        stkm.trainOn(input_stream)
+        predict_stream = stkm.predictOn(input_stream)
+        predict_stream.foreachRDD(collect)
+
+        t = time()
+        self.ssc.start()
+        self._ssc_wait(t, 6.0, 0.01)
+        self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+
 if __name__ == "__main__":
     if not _have_scipy:
         print("NOTE: Skipping SciPy tests as it does not seem to be installed")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org