You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/16 05:33:50 UTC

[1/2] [WIP] SPARK-1430: Support sparse data in Python MLlib

Repository: spark
Updated Branches:
  refs/heads/master 8517911ef -> 63ca581d9


http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
new file mode 100644
index 0000000..d4771d7
--- /dev/null
+++ b/python/pyspark/mllib/tests.py
@@ -0,0 +1,302 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fuller unit tests for Python MLlib.
+"""
+
+from numpy import array, array_equal
+import unittest
+
+from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
+        _deserialize_double_vector, _dot, _squared_distance
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.tests import PySparkTestCase
+
+
+_have_scipy = False
+try:
+    import scipy.sparse
+    _have_scipy = True
+except:
+    # No SciPy, but that's okay, we'll skip those tests
+    pass
+
+
+class VectorTests(unittest.TestCase):
+    def test_serialize(self):
+        sv = SparseVector(4, {1: 1, 3: 2})
+        dv = array([1., 2., 3., 4.])
+        lst = [1, 2, 3, 4]
+        self.assertTrue(sv is _convert_vector(sv))
+        self.assertTrue(dv is _convert_vector(dv))
+        self.assertTrue(array_equal(dv, _convert_vector(lst)))
+        self.assertEquals(sv,
+                _deserialize_double_vector(_serialize_double_vector(sv)))
+        self.assertTrue(array_equal(dv,
+                _deserialize_double_vector(_serialize_double_vector(dv))))
+        self.assertTrue(array_equal(dv,
+                _deserialize_double_vector(_serialize_double_vector(lst))))
+
+    def test_dot(self):
+        sv = SparseVector(4, {1: 1, 3: 2})
+        dv = array([1., 2., 3., 4.])
+        lst = [1, 2, 3, 4]
+        mat = array([[1., 2., 3., 4.],
+                     [1., 2., 3., 4.],
+                     [1., 2., 3., 4.],
+                     [1., 2., 3., 4.]])
+        self.assertEquals(10.0, _dot(sv, dv))
+        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
+        self.assertEquals(30.0, _dot(dv, dv))
+        self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
+        self.assertEquals(30.0, _dot(lst, dv))
+        self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+
+    def test_squared_distance(self):
+        sv = SparseVector(4, {1: 1, 3: 2})
+        dv = array([1., 2., 3., 4.])
+        lst = [4, 3, 2, 1]
+        self.assertEquals(15.0, _squared_distance(sv, dv))
+        self.assertEquals(25.0, _squared_distance(sv, lst))
+        self.assertEquals(20.0, _squared_distance(dv, lst))
+        self.assertEquals(15.0, _squared_distance(dv, sv))
+        self.assertEquals(25.0, _squared_distance(lst, sv))
+        self.assertEquals(20.0, _squared_distance(lst, dv))
+        self.assertEquals(0.0, _squared_distance(sv, sv))
+        self.assertEquals(0.0, _squared_distance(dv, dv))
+        self.assertEquals(0.0, _squared_distance(lst, lst))
+
+
+class ListTests(PySparkTestCase):
+    """
+    Test MLlib algorithms on plain lists, to make sure they're passed through
+    as NumPy arrays.
+    """
+
+    def test_clustering(self):
+        from pyspark.mllib.clustering import KMeans
+        data = [
+            [0, 1.1],
+            [0, 1.2],
+            [1.1, 0],
+            [1.2, 0],
+        ]
+        clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+        self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+        self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+    def test_classification(self):
+        from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+        data = [
+            LabeledPoint(0.0, [1, 0]),
+            LabeledPoint(1.0, [0, 1]),
+            LabeledPoint(0.0, [2, 0]),
+            LabeledPoint(1.0, [0, 2])
+        ]
+        rdd = self.sc.parallelize(data)
+        features = [p.features.tolist() for p in data]
+
+        lr_model = LogisticRegressionWithSGD.train(rdd)
+        self.assertTrue(lr_model.predict(features[0]) <= 0)
+        self.assertTrue(lr_model.predict(features[1]) > 0)
+        self.assertTrue(lr_model.predict(features[2]) <= 0)
+        self.assertTrue(lr_model.predict(features[3]) > 0)
+
+        svm_model = SVMWithSGD.train(rdd)
+        self.assertTrue(svm_model.predict(features[0]) <= 0)
+        self.assertTrue(svm_model.predict(features[1]) > 0)
+        self.assertTrue(svm_model.predict(features[2]) <= 0)
+        self.assertTrue(svm_model.predict(features[3]) > 0)
+
+        nb_model = NaiveBayes.train(rdd)
+        self.assertTrue(nb_model.predict(features[0]) <= 0)
+        self.assertTrue(nb_model.predict(features[1]) > 0)
+        self.assertTrue(nb_model.predict(features[2]) <= 0)
+        self.assertTrue(nb_model.predict(features[3]) > 0)
+
+    def test_regression(self):
+        from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+                RidgeRegressionWithSGD
+        data = [
+            LabeledPoint(-1.0, [0, -1]),
+            LabeledPoint(1.0, [0, 1]),
+            LabeledPoint(-1.0, [0, -2]),
+            LabeledPoint(1.0, [0, 2])
+        ]
+        rdd = self.sc.parallelize(data)
+        features = [p.features.tolist() for p in data]
+
+        lr_model = LinearRegressionWithSGD.train(rdd)
+        self.assertTrue(lr_model.predict(features[0]) <= 0)
+        self.assertTrue(lr_model.predict(features[1]) > 0)
+        self.assertTrue(lr_model.predict(features[2]) <= 0)
+        self.assertTrue(lr_model.predict(features[3]) > 0)
+
+        lasso_model = LassoWithSGD.train(rdd)
+        self.assertTrue(lasso_model.predict(features[0]) <= 0)
+        self.assertTrue(lasso_model.predict(features[1]) > 0)
+        self.assertTrue(lasso_model.predict(features[2]) <= 0)
+        self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+        rr_model = RidgeRegressionWithSGD.train(rdd)
+        self.assertTrue(rr_model.predict(features[0]) <= 0)
+        self.assertTrue(rr_model.predict(features[1]) > 0)
+        self.assertTrue(rr_model.predict(features[2]) <= 0)
+        self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+@unittest.skipIf(not _have_scipy, "SciPy not installed")
+class SciPyTests(PySparkTestCase):
+    """
+    Test both vector operations and MLlib algorithms with SciPy sparse matrices,
+    if SciPy is available.
+    """
+
+    def test_serialize(self):
+        from scipy.sparse import lil_matrix
+        lil = lil_matrix((4, 1))
+        lil[1, 0] = 1
+        lil[3, 0] = 2
+        sv = SparseVector(4, {1: 1, 3: 2})
+        self.assertEquals(sv, _convert_vector(lil))
+        self.assertEquals(sv, _convert_vector(lil.tocsc()))
+        self.assertEquals(sv, _convert_vector(lil.tocoo()))
+        self.assertEquals(sv, _convert_vector(lil.tocsr()))
+        self.assertEquals(sv, _convert_vector(lil.todok()))
+        self.assertEquals(sv,
+                _deserialize_double_vector(_serialize_double_vector(lil)))
+        self.assertEquals(sv,
+                _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
+        self.assertEquals(sv,
+                _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
+        self.assertEquals(sv,
+                _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+
+    def test_dot(self):
+        from scipy.sparse import lil_matrix
+        lil = lil_matrix((4, 1))
+        lil[1, 0] = 1
+        lil[3, 0] = 2
+        dv = array([1., 2., 3., 4.])
+        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+        mat = array([[1., 2., 3., 4.],
+                     [1., 2., 3., 4.],
+                     [1., 2., 3., 4.],
+                     [1., 2., 3., 4.]])
+        self.assertEquals(10.0, _dot(lil, dv))
+        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+
+    def test_squared_distance(self):
+        from scipy.sparse import lil_matrix
+        lil = lil_matrix((4, 1))
+        lil[1, 0] = 3
+        lil[3, 0] = 2
+        dv = array([1., 2., 3., 4.])
+        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+        self.assertEquals(15.0, _squared_distance(lil, dv))
+        self.assertEquals(15.0, _squared_distance(lil, sv))
+        self.assertEquals(15.0, _squared_distance(dv, lil))
+        self.assertEquals(15.0, _squared_distance(sv, lil))
+
+    def scipy_matrix(self, size, values):
+        """Create a column SciPy matrix from a dictionary of values"""
+        from scipy.sparse import lil_matrix
+        lil = lil_matrix((size, 1))
+        for key, value in values.items():
+            lil[key, 0] = value
+        return lil
+
+    def test_clustering(self):
+        from pyspark.mllib.clustering import KMeans
+        data = [
+            self.scipy_matrix(3, {1: 1.0}),
+            self.scipy_matrix(3, {1: 1.1}),
+            self.scipy_matrix(3, {2: 1.0}),
+            self.scipy_matrix(3, {2: 1.1})
+        ]
+        clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+        self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+        self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+    def test_classification(self):
+        from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+        data = [
+            LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
+            LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+            LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
+            LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+        ]
+        rdd = self.sc.parallelize(data)
+        features = [p.features for p in data]
+
+        lr_model = LogisticRegressionWithSGD.train(rdd)
+        self.assertTrue(lr_model.predict(features[0]) <= 0)
+        self.assertTrue(lr_model.predict(features[1]) > 0)
+        self.assertTrue(lr_model.predict(features[2]) <= 0)
+        self.assertTrue(lr_model.predict(features[3]) > 0)
+
+        svm_model = SVMWithSGD.train(rdd)
+        self.assertTrue(svm_model.predict(features[0]) <= 0)
+        self.assertTrue(svm_model.predict(features[1]) > 0)
+        self.assertTrue(svm_model.predict(features[2]) <= 0)
+        self.assertTrue(svm_model.predict(features[3]) > 0)
+
+        nb_model = NaiveBayes.train(rdd)
+        self.assertTrue(nb_model.predict(features[0]) <= 0)
+        self.assertTrue(nb_model.predict(features[1]) > 0)
+        self.assertTrue(nb_model.predict(features[2]) <= 0)
+        self.assertTrue(nb_model.predict(features[3]) > 0)
+
+    def test_regression(self):
+        from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+                RidgeRegressionWithSGD
+        data = [
+            LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
+            LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+            LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
+            LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+        ]
+        rdd = self.sc.parallelize(data)
+        features = [p.features for p in data]
+
+        lr_model = LinearRegressionWithSGD.train(rdd)
+        self.assertTrue(lr_model.predict(features[0]) <= 0)
+        self.assertTrue(lr_model.predict(features[1]) > 0)
+        self.assertTrue(lr_model.predict(features[2]) <= 0)
+        self.assertTrue(lr_model.predict(features[3]) > 0)
+
+        lasso_model = LassoWithSGD.train(rdd)
+        self.assertTrue(lasso_model.predict(features[0]) <= 0)
+        self.assertTrue(lasso_model.predict(features[1]) > 0)
+        self.assertTrue(lasso_model.predict(features[2]) <= 0)
+        self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+        rr_model = RidgeRegressionWithSGD.train(rdd)
+        self.assertTrue(rr_model.predict(features[0]) <= 0)
+        self.assertTrue(rr_model.predict(features[1]) > 0)
+        self.assertTrue(rr_model.predict(features[2]) <= 0)
+        self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+if __name__ == "__main__":
+    if not _have_scipy:
+        print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+    unittest.main()
+    if not _have_scipy:
+        print "NOTE: SciPy tests were skipped as it does not seem to be installed"

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/run-tests
----------------------------------------------------------------------
diff --git a/python/run-tests b/python/run-tests
index dabb714..7bbf10d 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -34,7 +34,7 @@ rm -rf metastore warehouse
 function run_test() {
     SPARK_TESTING=0 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log
     FAILED=$((PIPESTATUS[0]||$FAILED))
-    
+
     # Fail and exit on the first test failure.
     if [[ $FAILED != 0 ]]; then
         cat unit-tests.log | grep -v "^[0-9][0-9]*" # filter all lines starting with a number.
@@ -57,8 +57,10 @@ run_test "pyspark/tests.py"
 run_test "pyspark/mllib/_common.py"
 run_test "pyspark/mllib/classification.py"
 run_test "pyspark/mllib/clustering.py"
+run_test "pyspark/mllib/linalg.py"
 run_test "pyspark/mllib/recommendation.py"
 run_test "pyspark/mllib/regression.py"
+run_test "pyspark/mllib/tests.py"
 
 if [[ $FAILED == 0 ]]; then
     echo -en "\033[32m"  # Green


[2/2] git commit: [WIP] SPARK-1430: Support sparse data in Python MLlib

Posted by pw...@apache.org.
[WIP] SPARK-1430: Support sparse data in Python MLlib

This PR adds a SparseVector class in PySpark and updates all the regression, classification and clustering algorithms and models to support sparse data, similar to MLlib. I chose to add this class because SciPy is quite difficult to install in many environments (more so than NumPy), but I plan to add support for SciPy sparse vectors later too, and make the methods work transparently on objects of either type.

On the Scala side, we keep Python sparse vectors sparse and pass them to MLlib. We always return dense vectors from our models.

Some to-do items left:
- [x] Support SciPy's scipy.sparse matrix objects when SciPy is available. We can easily add a function to convert these to our own SparseVector.
- [x] MLlib currently uses a vector with one extra column on the left to represent what we call LabeledPoint in Scala. Do we really want this? It may get annoying once you deal with sparse data since you must add/subtract 1 to each feature index when training. We can remove this API in 1.0 and use tuples for labeling.
- [x] Explain how to use these in the Python MLlib docs.

CC @mengxr, @joshrosen

Author: Matei Zaharia <ma...@databricks.com>

Closes #341 from mateiz/py-ml-update and squashes the following commits:

d52e763 [Matei Zaharia] Remove no-longer-needed slice code and handle review comments
ea5a25a [Matei Zaharia] Fix remaining uses of copyto() after merge
b9f97a3 [Matei Zaharia] Fix test
1e1bd0f [Matei Zaharia] Add MLlib logistic regression example in Python
88bc01f [Matei Zaharia] Clean up inheritance of LinearModel in Python, and expose its parametrs
37ab747 [Matei Zaharia] Fix some examples and docs due to changes in MLlib API
da0f27e [Matei Zaharia] Added a MLlib K-means example and updated docs to discuss sparse data
c48e85a [Matei Zaharia] Added some tests for passing lists as input, and added mllib/tests.py to run-tests script.
a07ba10 [Matei Zaharia] Fix some typos and calculation of initial weights
74eefe7 [Matei Zaharia] Added LabeledPoint class in Python
889dde8 [Matei Zaharia] Support scipy.sparse matrices in all our algorithms and models
ab244d1 [Matei Zaharia] Allow SparseVectors to be initialized using a dict
a5d6426 [Matei Zaharia] Add linalg.py to run-tests script
0e7a3d8 [Matei Zaharia] Keep vectors sparse in Java when reading LabeledPoints
eaee759 [Matei Zaharia] Update regression, classification and clustering models for sparse data
2abbb44 [Matei Zaharia] Further work to get linear models working with sparse data
154f45d [Matei Zaharia] Update docs, name some magic values
881fef7 [Matei Zaharia] Added a sparse vector in Python and made Java-Python format more compact


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63ca581d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63ca581d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63ca581d

Branch: refs/heads/master
Commit: 63ca581d9c84176549b1ea0a1d8d7c0cca982acc
Parents: 8517911
Author: Matei Zaharia <ma...@databricks.com>
Authored: Tue Apr 15 20:33:24 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Apr 15 20:33:24 2014 -0700

----------------------------------------------------------------------
 docs/mllib-classification-regression.md         |  45 ++-
 docs/mllib-clustering.md                        |  11 +-
 docs/mllib-guide.md                             |  27 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 156 +++++---
 .../org/apache/spark/mllib/linalg/Vectors.scala |   8 +-
 .../spark/mllib/linalg/VectorsSuite.scala       |  18 +
 python/epydoc.conf                              |   3 +-
 python/examples/kmeans.py                       |  11 +-
 python/examples/logistic_regression.py          |   8 +-
 python/examples/mllib/kmeans.py                 |  44 +++
 python/examples/mllib/logistic_regression.py    |  50 +++
 python/pyspark/mllib/_common.py                 | 396 ++++++++++++++-----
 python/pyspark/mllib/classification.py          |  75 +++-
 python/pyspark/mllib/clustering.py              |  51 ++-
 python/pyspark/mllib/linalg.py                  | 245 ++++++++++++
 python/pyspark/mllib/regression.py              | 128 +++++-
 python/pyspark/mllib/tests.py                   | 302 ++++++++++++++
 python/run-tests                                |   4 +-
 18 files changed, 1368 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/docs/mllib-classification-regression.md
----------------------------------------------------------------------
diff --git a/docs/mllib-classification-regression.md b/docs/mllib-classification-regression.md
index cc8acf1..2c42f60 100644
--- a/docs/mllib-classification-regression.md
+++ b/docs/mllib-classification-regression.md
@@ -356,16 +356,17 @@ error.
 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.classification.SVMWithSGD
 import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
 
 // Load and parse the data file
 val data = sc.textFile("mllib/data/sample_svm_data.txt")
 val parsedData = data.map { line =>
-  val parts = line.split(' ')
-  LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
+  val parts = line.split(' ').map(_.toDouble)
+  LabeledPoint(parts(0), Vectors.dense(parts.tail))
 }
 
 // Run training algorithm to build the model
-val numIterations = 20
+val numIterations = 100
 val model = SVMWithSGD.train(parsedData, numIterations)
 
 // Evaluate model on training examples and compute training error
@@ -401,21 +402,22 @@ val modelL1 = svmAlg.run(parsedData)
 The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
 The example then uses LinearRegressionWithSGD to build a simple linear model to predict label 
 values. We compute the Mean Squared Error at the end to evaluate
-[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
+[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
 
 {% highlight scala %}
 import org.apache.spark.mllib.regression.LinearRegressionWithSGD
 import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
 
 // Load and parse the data
 val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
 val parsedData = data.map { line =>
   val parts = line.split(',')
-  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
+  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
 }
 
 // Building the model
-val numIterations = 20
+val numIterations = 100
 val model = LinearRegressionWithSGD.train(parsedData, numIterations)
 
 // Evaluate model on training examples and compute training error
@@ -423,7 +425,7 @@ val valuesAndPreds = parsedData.map { point =>
   val prediction = model.predict(point.features)
   (point.label, prediction)
 }
-val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
+val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.reduce(_ + _) / valuesAndPreds.count
 println("training Mean Squared Error = " + MSE)
 {% endhighlight %}
 
@@ -518,18 +520,22 @@ and make predictions with the resulting model to compute the training error.
 
 {% highlight python %}
 from pyspark.mllib.classification import LogisticRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint
 from numpy import array
 
 # Load and parse the data
+def parsePoint(line):
+    values = [float(x) for x in line.split(' ')]
+    return LabeledPoint(values[0], values[1:])
+
 data = sc.textFile("mllib/data/sample_svm_data.txt")
-parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
-model = LogisticRegressionWithSGD.train(parsedData)
+parsedData = data.map(parsePoint)
 
 # Build the model
-labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
-        model.predict(point.take(range(1, point.size)))))
+model = LogisticRegressionWithSGD.train(parsedData)
 
 # Evaluating the model on training data
+labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
 trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
 print("Training Error = " + str(trainErr))
 {% endhighlight %}
@@ -538,22 +544,25 @@ print("Training Error = " + str(trainErr))
 The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
 The example then uses LinearRegressionWithSGD to build a simple linear model to predict label 
 values. We compute the Mean Squared Error at the end to evaluate
-[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
+[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
 
 {% highlight python %}
-from pyspark.mllib.regression import LinearRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
 from numpy import array
 
 # Load and parse the data
+def parsePoint(line):
+    values = [float(x) for x in line.replace(',', ' ').split(' ')]
+    return LabeledPoint(values[0], values[1:])
+
 data = sc.textFile("mllib/data/ridge-data/lpsa.data")
-parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
+parsedData = data.map(parsePoint)
 
 # Build the model
 model = LinearRegressionWithSGD.train(parsedData)
 
 # Evaluate the model on training data
-valuesAndPreds = parsedData.map(lambda point: (point.item(0),
-        model.predict(point.take(range(1, point.size)))))
-MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
+valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
+MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
 print("Mean Squared Error = " + str(MSE))
-{% endhighlight %}
\ No newline at end of file
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/docs/mllib-clustering.md
----------------------------------------------------------------------
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 65ed75b..50a8671 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -48,14 +48,15 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
 
 {% highlight scala %}
 import org.apache.spark.mllib.clustering.KMeans
+import org.apache.spark.mllib.linalg.Vectors
 
 // Load and parse the data
-val data = sc.textFile("kmeans_data.txt")
-val parsedData = data.map( _.split(' ').map(_.toDouble))
+val data = sc.textFile("data/kmeans_data.txt")
+val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
 
 // Cluster the data into two classes using KMeans
-val numIterations = 20
 val numClusters = 2
+val numIterations = 20
 val clusters = KMeans.train(parsedData, numClusters, numIterations)
 
 // Evaluate clustering by computing Within Set Sum of Squared Errors
@@ -85,12 +86,12 @@ from numpy import array
 from math import sqrt
 
 # Load and parse the data
-data = sc.textFile("kmeans_data.txt")
+data = sc.textFile("data/kmeans_data.txt")
 parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
 
 # Build the model (cluster the data)
 clusters = KMeans.train(parsedData, 2, maxIterations=10,
-        runs=30, initialization_mode="random")
+        runs=10, initialization_mode="random")
 
 # Evaluate clustering by computing Within Set Sum of Squared Errors
 def error(point):

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/docs/mllib-guide.md
----------------------------------------------------------------------
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 1ac5cc1..4236b0c 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -7,8 +7,9 @@ title: Machine Learning Library (MLlib)
 MLlib is a Spark implementation of some common machine learning (ML)
 functionality, as well associated tests and data generators.  MLlib
 currently supports four common types of machine learning problem settings,
-namely, binary classification, regression, clustering and collaborative
-filtering, as well as an underlying gradient descent optimization primitive.
+namely classification, regression, clustering and collaborative filtering,
+as well as an underlying gradient descent optimization primitive and several
+linear algebra methods.
 
 # Available Methods
 The following links provide a detailed explanation of the methods and usage examples for each of them:
@@ -32,6 +33,28 @@ The following links provide a detailed explanation of the methods and usage exam
   * Singular Value Decomposition
   * Principal Component Analysis
 
+# Data Types
+
+Most MLlib algorithms operate on RDDs containing vectors. In Java and Scala, the
+[Vector](api/mllib/index.html#org.apache.spark.mllib.linalg.Vector) class is used to
+represent vectors. You can create either dense or sparse vectors using the
+[Vectors](api/mllib/index.html#org.apache.spark.mllib.linalg.Vectors$) factory.
+
+In Python, MLlib can take the following vector types:
+
+* [NumPy](http://www.numpy.org) arrays
+* Standard Python lists (e.g. `[1, 2, 3]`)
+* The MLlib [SparseVector](api/pyspark/pyspark.mllib.linalg.SparseVector-class.html) class
+* [SciPy sparse matrices](http://docs.scipy.org/doc/scipy/reference/sparse.html)
+
+For efficiency, we recommend using NumPy arrays over lists, and using the
+[CSC format](http://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html#scipy.sparse.csc_matrix)
+for SciPy matrices, or MLlib's own SparseVector class.
+
+Several other simple data types are used throughout the library, e.g. the LabeledPoint
+class ([Java/Scala](api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint),
+[Python](api/pyspark/pyspark.mllib.regression.LabeledPoint-class.html)) for labeled data.
+
 # Dependencies
 MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself
 depends on native Fortran routines. You may need to install the

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index a6c049e..7c65b0d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
 import org.apache.spark.rdd.RDD
@@ -31,56 +31,112 @@ import org.apache.spark.rdd.RDD
 /**
  * :: DeveloperApi ::
  * The Java stubs necessary for the Python mllib bindings.
+ *
+ * See python/pyspark/mllib/_common.py for the mutually agreed upon data format.
  */
 @DeveloperApi
 class PythonMLLibAPI extends Serializable {
-  private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
-    val packetLength = bytes.length
-    if (packetLength < 16) {
-      throw new IllegalArgumentException("Byte array too short.")
-    }
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.getLong()
-    if (magic != 1) {
+  private val DENSE_VECTOR_MAGIC: Byte = 1
+  private val SPARSE_VECTOR_MAGIC: Byte = 2
+  private val DENSE_MATRIX_MAGIC: Byte = 3
+  private val LABELED_POINT_MAGIC: Byte = 4
+
+  private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+    require(bytes.length - offset >= 5, "Byte array too short")
+    val magic = bytes(offset)
+    if (magic == DENSE_VECTOR_MAGIC) {
+      deserializeDenseVector(bytes, offset)
+    } else if (magic == SPARSE_VECTOR_MAGIC) {
+      deserializeSparseVector(bytes, offset)
+    } else {
       throw new IllegalArgumentException("Magic " + magic + " is wrong.")
     }
-    val length = bb.getLong()
-    if (packetLength != 16 + 8 * length) {
-      throw new IllegalArgumentException("Length " + length + " is wrong.")
-    }
+  }
+
+  private def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+    val packetLength = bytes.length - offset
+    require(packetLength >= 5, "Byte array too short")
+    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.get()
+    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
+    val length = bb.getInt()
+    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
     val db = bb.asDoubleBuffer()
     val ans = new Array[Double](length.toInt)
     db.get(ans)
-    ans
+    Vectors.dense(ans)
   }
 
-  private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+  private def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+    val packetLength = bytes.length - offset
+    require(packetLength >= 9, "Byte array too short")
+    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.get()
+    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
+    val size = bb.getInt()
+    val nonZeros = bb.getInt()
+    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
+    val ib = bb.asIntBuffer()
+    val indices = new Array[Int](nonZeros)
+    ib.get(indices)
+    bb.position(bb.position() + 4 * nonZeros)
+    val db = bb.asDoubleBuffer()
+    val values = new Array[Double](nonZeros)
+    db.get(values)
+    Vectors.sparse(size, indices, values)
+  }
+
+  private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
     val len = doubles.length
-    val bytes = new Array[Byte](16 + 8 * len)
+    val bytes = new Array[Byte](5 + 8 * len)
     val bb = ByteBuffer.wrap(bytes)
     bb.order(ByteOrder.nativeOrder())
-    bb.putLong(1)
-    bb.putLong(len)
+    bb.put(DENSE_VECTOR_MAGIC)
+    bb.putInt(len)
     val db = bb.asDoubleBuffer()
     db.put(doubles)
     bytes
   }
 
+  private def serializeSparseVector(vector: SparseVector): Array[Byte] = {
+    val nonZeros = vector.indices.length
+    val bytes = new Array[Byte](9 + 12 * nonZeros)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.put(SPARSE_VECTOR_MAGIC)
+    bb.putInt(vector.size)
+    bb.putInt(nonZeros)
+    val ib = bb.asIntBuffer()
+    ib.put(vector.indices)
+    bb.position(bb.position() + 4 * nonZeros)
+    val db = bb.asDoubleBuffer()
+    db.put(vector.values)
+    bytes
+  }
+
+  private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
+    case s: SparseVector =>
+      serializeSparseVector(s)
+    case _ =>
+      serializeDenseVector(vector.toArray)
+  }
+
   private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
     val packetLength = bytes.length
-    if (packetLength < 24) {
+    if (packetLength < 9) {
       throw new IllegalArgumentException("Byte array too short.")
     }
     val bb = ByteBuffer.wrap(bytes)
     bb.order(ByteOrder.nativeOrder())
-    val magic = bb.getLong()
-    if (magic != 2) {
+    val magic = bb.get()
+    if (magic != DENSE_MATRIX_MAGIC) {
       throw new IllegalArgumentException("Magic " + magic + " is wrong.")
     }
-    val rows = bb.getLong()
-    val cols = bb.getLong()
-    if (packetLength != 24 + 8 * rows * cols) {
+    val rows = bb.getInt()
+    val cols = bb.getInt()
+    if (packetLength != 9 + 8 * rows * cols) {
       throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
     }
     val db = bb.asDoubleBuffer()
@@ -98,12 +154,12 @@ class PythonMLLibAPI extends Serializable {
     if (rows > 0) {
       cols = doubles(0).length
     }
-    val bytes = new Array[Byte](24 + 8 * rows * cols)
+    val bytes = new Array[Byte](9 + 8 * rows * cols)
     val bb = ByteBuffer.wrap(bytes)
     bb.order(ByteOrder.nativeOrder())
-    bb.putLong(2)
-    bb.putLong(rows)
-    bb.putLong(cols)
+    bb.put(DENSE_MATRIX_MAGIC)
+    bb.putInt(rows)
+    bb.putInt(cols)
     val db = bb.asDoubleBuffer()
     for (i <- 0 until rows) {
       db.put(doubles(i))
@@ -111,18 +167,27 @@ class PythonMLLibAPI extends Serializable {
     bytes
   }
 
+  private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
+    require(bytes.length >= 9, "Byte array too short")
+    val magic = bytes(0)
+    if (magic != LABELED_POINT_MAGIC) {
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    }
+    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
+    labelBytes.order(ByteOrder.nativeOrder())
+    val label = labelBytes.asDoubleBuffer().get(0)
+    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
+  }
+
   private def trainRegressionModel(
-      trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+      trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
       dataBytesJRDD: JavaRDD[Array[Byte]],
       initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(xBytes => {
-        val x = deserializeDoubleVector(xBytes)
-        LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
-    })
+    val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
     val initialWeights = deserializeDoubleVector(initialWeightsBA)
     val model = trainFunc(data, initialWeights)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleVector(model.weights.toArray))
+    ret.add(serializeDoubleVector(model.weights))
     ret.add(model.intercept: java.lang.Double)
     ret
   }
@@ -143,7 +208,7 @@ class PythonMLLibAPI extends Serializable {
           numIterations,
           stepSize,
           miniBatchFraction,
-          Vectors.dense(initialWeights)),
+          initialWeights),
       dataBytesJRDD,
       initialWeightsBA)
   }
@@ -166,7 +231,7 @@ class PythonMLLibAPI extends Serializable {
           stepSize,
           regParam,
           miniBatchFraction,
-          Vectors.dense(initialWeights)),
+          initialWeights),
       dataBytesJRDD,
       initialWeightsBA)
   }
@@ -189,7 +254,7 @@ class PythonMLLibAPI extends Serializable {
           stepSize,
           regParam,
           miniBatchFraction,
-          Vectors.dense(initialWeights)),
+          initialWeights),
       dataBytesJRDD,
       initialWeightsBA)
   }
@@ -212,7 +277,7 @@ class PythonMLLibAPI extends Serializable {
           stepSize,
           regParam,
           miniBatchFraction,
-          Vectors.dense(initialWeights)),
+          initialWeights),
       dataBytesJRDD,
       initialWeightsBA)
   }
@@ -233,7 +298,7 @@ class PythonMLLibAPI extends Serializable {
           numIterations,
           stepSize,
           miniBatchFraction,
-          Vectors.dense(initialWeights)),
+          initialWeights),
       dataBytesJRDD,
       initialWeightsBA)
   }
@@ -244,14 +309,11 @@ class PythonMLLibAPI extends Serializable {
   def trainNaiveBayes(
       dataBytesJRDD: JavaRDD[Array[Byte]],
       lambda: Double): java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(xBytes => {
-      val x = deserializeDoubleVector(xBytes)
-      LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
-    })
+    val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
     val model = NaiveBayes.train(data, lambda)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleVector(model.labels))
-    ret.add(serializeDoubleVector(model.pi))
+    ret.add(serializeDoubleVector(Vectors.dense(model.labels)))
+    ret.add(serializeDoubleVector(Vectors.dense(model.pi)))
     ret.add(serializeDoubleMatrix(model.theta))
     ret
   }
@@ -265,7 +327,7 @@ class PythonMLLibAPI extends Serializable {
       maxIterations: Int,
       runs: Int,
       initializationMode: String): java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
+    val data = dataBytesJRDD.rdd.map(bytes => deserializeDoubleVector(bytes))
     val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
     val ret = new java.util.LinkedList[java.lang.Object]()
     ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 99a849f..7cdf6bd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -130,9 +130,11 @@ object Vectors {
   private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = {
     breezeVector match {
       case v: BDV[Double] =>
-        require(v.offset == 0, s"Do not support non-zero offset ${v.offset}.")
-        require(v.stride == 1, s"Do not support stride other than 1, but got ${v.stride}.")
-        new DenseVector(v.data)
+        if (v.offset == 0 && v.stride == 1) {
+          new DenseVector(v.data)
+        } else {
+          new DenseVector(v.toArray)  // Can't use underlying array directly, so make a new one
+        }
       case v: BSV[Double] =>
         new SparseVector(v.length, v.index, v.data)
       case v: BV[_] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 8a20031..cfe8a27 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -82,4 +82,22 @@ class VectorsSuite extends FunSuite {
       assert(v.## != another.##)
     }
   }
+
+  test("indexing dense vectors") {
+    val vec = Vectors.dense(1.0, 2.0, 3.0, 4.0)
+    assert(vec(0) === 1.0)
+    assert(vec(3) === 4.0)
+  }
+
+  test("indexing sparse vectors") {
+    val vec = Vectors.sparse(7, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
+    assert(vec(0) === 1.0)
+    assert(vec(1) === 0.0)
+    assert(vec(2) === 2.0)
+    assert(vec(3) === 0.0)
+    assert(vec(6) === 4.0)
+    val vec2 = Vectors.sparse(8, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
+    assert(vec2(6) === 4.0)
+    assert(vec2(7) === 0.0)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/epydoc.conf
----------------------------------------------------------------------
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 95a6af0..081ed21 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -33,5 +33,6 @@ target: docs/
 private: no
 
 exclude: pyspark.cloudpickle pyspark.worker pyspark.join
-         pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test
+         pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
          pyspark.rddsampler pyspark.daemon pyspark.mllib._common
+         pyspark.mllib.tests

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/examples/kmeans.py
----------------------------------------------------------------------
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
index ba31af9..d8387b0 100755
--- a/python/examples/kmeans.py
+++ b/python/examples/kmeans.py
@@ -16,8 +16,13 @@
 #
 
 """
-This example requires numpy (http://www.numpy.org/)
+The K-means algorithm written from scratch against PySpark. In practice,
+one may prefer to use the KMeans algorithm in MLlib, as shown in
+python/examples/mllib/kmeans.py.
+
+This example requires NumPy (http://www.numpy.org/).
 """
+
 import sys
 
 import numpy as np
@@ -49,9 +54,7 @@ if __name__ == "__main__":
     K = int(sys.argv[3])
     convergeDist = float(sys.argv[4])
 
-    # TODO: change this after we port takeSample()
-    #kPoints = data.takeSample(False, K, 34)
-    kPoints = data.take(K)
+    kPoints = data.takeSample(False, K, 1)
     tempDist = 1.0
 
     while tempDist > convergeDist:

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/examples/logistic_regression.py
----------------------------------------------------------------------
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index 1117dea..28d52e6 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -16,9 +16,13 @@
 #
 
 """
-A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
-of input data using efficient matrix operations.
+A logistic regression implementation that uses NumPy (http://www.numpy.org)
+to act on batches of input data using efficient matrix operations.
+
+In practice, one may prefer to use the LogisticRegression algorithm in
+MLlib, as shown in python/examples/mllib/logistic_regression.py.
 """
+
 from collections import namedtuple
 from math import exp
 from os.path import realpath

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/examples/mllib/kmeans.py
----------------------------------------------------------------------
diff --git a/python/examples/mllib/kmeans.py b/python/examples/mllib/kmeans.py
new file mode 100755
index 0000000..dec82ff
--- /dev/null
+++ b/python/examples/mllib/kmeans.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A K-means clustering program using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.clustering import KMeans
+
+
+def parseVector(line):
+    return np.array([float(x) for x in line.split(' ')])
+
+
+if __name__ == "__main__":
+    if len(sys.argv) < 4:
+        print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
+        exit(-1)
+    sc = SparkContext(sys.argv[1], "KMeans")
+    lines = sc.textFile(sys.argv[2])
+    data = lines.map(parseVector)
+    k = int(sys.argv[3])
+    model = KMeans.train(data, k)
+    print "Final centers: " + str(model.clusterCenters)

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/examples/mllib/logistic_regression.py
----------------------------------------------------------------------
diff --git a/python/examples/mllib/logistic_regression.py b/python/examples/mllib/logistic_regression.py
new file mode 100755
index 0000000..8631051
--- /dev/null
+++ b/python/examples/mllib/logistic_regression.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Logistic regression using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+from math import exp
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.classification import LogisticRegressionWithSGD
+
+
+# Parse a line of text into an MLlib LabeledPoint object
+def parsePoint(line):
+    values = [float(s) for s in line.split(' ')]
+    if values[0] == -1:   # Convert -1 labels to 0 for MLlib
+        values[0] = 0
+    return LabeledPoint(values[0], values[1:])
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 4:
+        print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
+        exit(-1)
+    sc = SparkContext(sys.argv[1], "PythonLR")
+    points = sc.textFile(sys.argv[2]).map(parsePoint)
+    iterations = int(sys.argv[3])
+    model = LogisticRegressionWithSGD.train(points, iterations)
+    print "Final weights: " + str(model.weights)
+    print "Final intercept: " + str(model.intercept)

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e19f5d2..e6f0953 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,38 +15,86 @@
 # limitations under the License.
 #
 
-from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype
+import struct
+import numpy
+from numpy import ndarray, float64, int64, int32, array_equal, array
 from pyspark import SparkContext, RDD
-import numpy as np
-
+from pyspark.mllib.linalg import SparseVector
 from pyspark.serializers import Serializer
-import struct
 
-# Double vector format:
+"""
+Common utilities shared throughout MLlib, primarily for dealing with
+different data types. These include:
+- Serialization utilities to / from byte arrays that Java can handle
+- Serializers for other data types, like ALS Rating objects
+- Common methods for linear models
+- Methods to deal with the different vector types we support, such as
+  SparseVector and scipy.sparse matrices.
+"""
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+_have_scipy = False
+_scipy_issparse = None
+try:
+    import scipy.sparse
+    _have_scipy = True
+    _scipy_issparse = scipy.sparse.issparse
+except:
+    # No SciPy in environment, but that's okay
+    pass
+
+
+# Serialization functions to and from Scala. These use the following formats, understood
+# by the PythonMLLibAPI class in Scala:
+#
+# Dense double vector format:
+#
+# [1-byte 1] [4-byte length] [length*8 bytes of data]
 #
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
+# Sparse double vector format:
+#
+# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
 #
 # Double matrix format:
 #
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
+#
+# LabeledPoint format:
+#
+# [1-byte 4] [8-byte label] [dense or sparse vector]
 #
 # This is all in machine-endian.  That means that the Java interpreter and the
 # Python interpreter must agree on what endian the machine is.
 
-def _deserialize_byte_array(shape, ba, offset):
-    """Wrapper around ndarray aliasing hack.
+
+DENSE_VECTOR_MAGIC = 1
+SPARSE_VECTOR_MAGIC = 2
+DENSE_MATRIX_MAGIC = 3
+LABELED_POINT_MAGIC = 4
+
+
+def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
+    """
+    Deserialize a numpy array of the given type from an offset in
+    bytearray ba, assigning it the given shape.
 
     >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
-    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
     True
     >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
-    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
+    True
+    >>> x = array([1, 2, 3], dtype=int32)
+    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
     True
     """
-    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
-            order='C')
+    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
     return ar.copy()
 
+
 def _serialize_double_vector(v):
     """Serialize a double vector into a mutually understood format.
 
@@ -55,160 +103,231 @@ def _serialize_double_vector(v):
     >>> array_equal(y, array([1.0, 2.0, 3.0]))
     True
     """
-    if type(v) != ndarray:
-        raise TypeError("_serialize_double_vector called on a %s; "
-                "wanted ndarray" % type(v))
-    """complex is only datatype that can't be converted to float64"""
-    if issubdtype(v.dtype, complex):
+    v = _convert_vector(v)
+    if type(v) == ndarray:
+        return _serialize_dense_vector(v)
+    elif type(v) == SparseVector:
+        return _serialize_sparse_vector(v)
+    else:
         raise TypeError("_serialize_double_vector called on a %s; "
-                "wanted ndarray" % type(v))
-    if v.dtype != float64:
-        v = v.astype(float64)
+                "wanted ndarray or SparseVector" % type(v))
+
+
+def _serialize_dense_vector(v):
+    """Serialize a dense vector given as a NumPy array."""
     if v.ndim != 1:
         raise TypeError("_serialize_double_vector called on a %ddarray; "
                 "wanted a 1darray" % v.ndim)
+    if v.dtype != float64:
+        if numpy.issubdtype(v.dtype, numpy.complex):
+            raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+                    "wanted ndarray of float64" % v.dtype)
+        v = v.astype(float64)
     length = v.shape[0]
-    ba = bytearray(16 + 8*length)
-    header = ndarray(shape=[2], buffer=ba, dtype="int64")
-    header[0] = 1
-    header[1] = length
-    arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64")
-    arr_mid[...] = v
+    ba = bytearray(5 + 8 * length)
+    ba[0] = DENSE_VECTOR_MAGIC
+    length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
+    length_bytes[0] = length
+    _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
+    return ba
+
+
+def _serialize_sparse_vector(v):
+    """Serialize a pyspark.mllib.linalg.SparseVector."""
+    nonzeros = len(v.indices)
+    ba = bytearray(9 + 12 * nonzeros)
+    ba[0] = SPARSE_VECTOR_MAGIC
+    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+    header[0] = v.size
+    header[1] = nonzeros
+    _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
+    values_offset = 9 + 4 * nonzeros
+    _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
     return ba
 
+
 def _deserialize_double_vector(ba):
     """Deserialize a double vector from a mutually understood format.
 
     >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
     >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
     True
+    >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
+    >>> s == _deserialize_double_vector(_serialize_double_vector(s))
+    True
     """
     if type(ba) != bytearray:
         raise TypeError("_deserialize_double_vector called on a %s; "
                 "wanted bytearray" % type(ba))
-    if len(ba) < 16:
+    if len(ba) < 5:
         raise TypeError("_deserialize_double_vector called on a %d-byte array, "
                 "which is too short" % len(ba))
-    if (len(ba) & 7) != 0:
-        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
-                "which is not a multiple of 8" % len(ba))
-    header = ndarray(shape=[2], buffer=ba, dtype="int64")
-    if header[0] != 1:
+    if ba[0] == DENSE_VECTOR_MAGIC:
+        return _deserialize_dense_vector(ba)
+    elif ba[0] == SPARSE_VECTOR_MAGIC:
+        return _deserialize_sparse_vector(ba)
+    else:
         raise TypeError("_deserialize_double_vector called on bytearray "
                         "with wrong magic")
-    length = header[1]
-    if len(ba) != 8*length + 16:
-        raise TypeError("_deserialize_double_vector called on bytearray "
+
+
+def _deserialize_dense_vector(ba):
+    """Deserialize a dense vector into a numpy array."""
+    if len(ba) < 5:
+        raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
+                "which is too short" % len(ba))
+    length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
+    if len(ba) != 8 * length + 5:
+        raise TypeError("_deserialize_dense_vector called on bytearray "
+                        "with wrong length")
+    return _deserialize_numpy_array([length], ba, 5)
+
+
+def _deserialize_sparse_vector(ba):
+    """Deserialize a sparse vector into a MLlib SparseVector object."""
+    if len(ba) < 9:
+        raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
+                "which is too short" % len(ba))
+    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+    size = header[0]
+    nonzeros = header[1]
+    if len(ba) != 9 + 12 * nonzeros:
+        raise TypeError("_deserialize_sparse_vector called on bytearray "
                         "with wrong length")
-    return _deserialize_byte_array([length], ba, 16)
+    indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
+    values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+    return SparseVector(int(size), indices, values)
+
 
 def _serialize_double_matrix(m):
     """Serialize a double matrix into a mutually understood format."""
-    if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+    if (type(m) == ndarray and m.ndim == 2):
+        if m.dtype != float64:
+            if numpy.issubdtype(m.dtype, numpy.complex):
+                raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
+                        "wanted ndarray of float64" % m.dtype)
+            m = m.astype(float64)
         rows = m.shape[0]
         cols = m.shape[1]
-        ba = bytearray(24 + 8 * rows * cols)
-        header = ndarray(shape=[3], buffer=ba, dtype="int64")
-        header[0] = 2
-        header[1] = rows
-        header[2] = cols
-        arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24,
-                      dtype="float64", order='C')
-        arr_mid[...] = m
+        ba = bytearray(9 + 8 * rows * cols)
+        ba[0] = DENSE_MATRIX_MAGIC
+        lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
+        lengths[0] = rows
+        lengths[1] = cols
+        _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
         return ba
     else:
         raise TypeError("_serialize_double_matrix called on a "
                         "non-double-matrix")
 
+
 def _deserialize_double_matrix(ba):
     """Deserialize a double matrix from a mutually understood format."""
     if type(ba) != bytearray:
         raise TypeError("_deserialize_double_matrix called on a %s; "
                 "wanted bytearray" % type(ba))
-    if len(ba) < 24:
+    if len(ba) < 9:
         raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
                 "which is too short" % len(ba))
-    if (len(ba) & 7) != 0:
-        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
-                "which is not a multiple of 8" % len(ba))
-    header = ndarray(shape=[3], buffer=ba, dtype="int64")
-    if (header[0] != 2):
+    if ba[0] != DENSE_MATRIX_MAGIC:
         raise TypeError("_deserialize_double_matrix called on bytearray "
                         "with wrong magic")
-    rows = header[1]
-    cols = header[2]
-    if (len(ba) != 8*rows*cols + 24):
+    lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+    rows = lengths[0]
+    cols = lengths[1]
+    if (len(ba) != 8 * rows * cols + 9):
         raise TypeError("_deserialize_double_matrix called on bytearray "
                         "with wrong length")
-    return _deserialize_byte_array([rows, cols], ba, 24)
+    return _deserialize_numpy_array([rows, cols], ba, 9)
+
+
+def _serialize_labeled_point(p):
+    """Serialize a LabeledPoint with a features vector of any type."""
+    from pyspark.mllib.regression import LabeledPoint
+    serialized_features = _serialize_double_vector(p.features)
+    header = bytearray(9)
+    header[0] = LABELED_POINT_MAGIC
+    header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
+    header_float[0] = p.label
+    return header + serialized_features
+
+
+def _copyto(array, buffer, offset, shape, dtype):
+    """
+    Copy the contents of a vector to a destination bytearray at the
+    given offset.
+
+    TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
+    we should benchmark that to see whether it provides a benefit.
+    """
+    temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
+    temp_array[...] = array
 
-def _linear_predictor_typecheck(x, coeffs):
-    """Check that x is a one-dimensional vector of the right shape.
-    This is a temporary hackaround until I actually implement bulk predict."""
-    if type(x) == ndarray:
-        if x.ndim == 1:
-            if x.shape == coeffs.shape:
-                pass
-            else:
-                raise RuntimeError("Got array of %d elements; wanted %d"
-                        % (shape(x)[0], shape(coeffs)[0]))
-        else:
-            raise RuntimeError("Bulk predict not yet supported.")
-    elif (type(x) == RDD):
-        raise RuntimeError("Bulk predict not yet supported.")
-    else:
-        raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
 
 def _get_unmangled_rdd(data, serializer):
     dataBytes = data.map(serializer)
     dataBytes._bypass_serializer = True
-    dataBytes.cache()
+    dataBytes.cache() # TODO: users should unpersist() this later!
     return dataBytes
 
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+
+# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
 # _serialized_double_vectors
 def _get_unmangled_double_vector_rdd(data):
     return _get_unmangled_rdd(data, _serialize_double_vector)
 
-class LinearModel(object):
-    """Something that has a vector of coefficients and an intercept."""
-    def __init__(self, coeff, intercept):
-        self._coeff = coeff
-        self._intercept = intercept
 
-class LinearRegressionModelBase(LinearModel):
-    """A linear regression model.
+# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
+def _get_unmangled_labeled_point_rdd(data):
+    return _get_unmangled_rdd(data, _serialize_labeled_point)
 
-    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
-    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
-    True
+
+# Common functions for dealing with and training linear models
+
+def _linear_predictor_typecheck(x, coeffs):
     """
-    def predict(self, x):
-        """Predict the value of the dependent variable given a vector x"""
-        """containing values for the independent variables."""
-        _linear_predictor_typecheck(x, self._coeff)
-        return dot(self._coeff, x) + self._intercept
+    Check that x is a one-dimensional vector of the right shape.
+    This is a temporary hackaround until we actually implement bulk predict.
+    """
+    x = _convert_vector(x)
+    if type(x) == ndarray:
+        if x.ndim == 1:
+            if x.shape != coeffs.shape:
+                raise RuntimeError("Got array of %d elements; wanted %d"
+                        % (numpy.shape(x)[0], coeffs.shape[0]))
+        else:
+            raise RuntimeError("Bulk predict not yet supported.")
+    elif type(x) == SparseVector:
+        if x.size != coeffs.shape[0]:
+           raise RuntimeError("Got sparse vector of size %d; wanted %d"
+                   % (x.size, coeffs.shape[0]))
+    elif (type(x) == RDD):
+        raise RuntimeError("Bulk predict not yet supported.")
+    else:
+        raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
+
 
 # If we weren't given initial weights, take a zero vector of the appropriate
 # length.
 def _get_initial_weights(initial_weights, data):
     if initial_weights is None:
-        initial_weights = data.first()
-        if type(initial_weights) != ndarray:
-            raise TypeError("At least one data element has type "
-                    + type(initial_weights).__name__ + " which is not ndarray")
-        if initial_weights.ndim != 1:
-            raise TypeError("At least one data element has "
-                    + initial_weights.ndim + " dimensions, which is not 1")
-        initial_weights = ones([initial_weights.shape[0] - 1])
+        initial_weights = _convert_vector(data.first().features)
+        if type(initial_weights) == ndarray:
+            if initial_weights.ndim != 1:
+                raise TypeError("At least one data element has "
+                        + initial_weights.ndim + " dimensions, which is not 1")
+            initial_weights = numpy.zeros([initial_weights.shape[0]])
+        elif type(initial_weights) == SparseVector:
+            initial_weights = numpy.zeros([initial_weights.size])
     return initial_weights
 
+
 # train_func should take two parameters, namely data and initial_weights, and
 # return the result of a call to the appropriate JVM stub.
 # _regression_train_wrapper is responsible for setup and error checking.
 def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
     initial_weights = _get_initial_weights(initial_weights, data)
-    dataBytes = _get_unmangled_double_vector_rdd(data)
+    dataBytes = _get_unmangled_labeled_point_rdd(data)
     ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
     if len(ans) != 2:
         raise RuntimeError("JVM call result had unexpected length")
@@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
                 + type(ans[0]).__name__ + " which is not float")
     return klass(_deserialize_double_vector(ans[0]), ans[1])
 
+
+# Functions for serializing ALS Rating objects and tuples
+
 def _serialize_rating(r):
     ba = bytearray(16)
     intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
@@ -227,11 +349,12 @@ def _serialize_rating(r):
     intpart[0], intpart[1], doublepart[0] = r
     return ba
 
+
 class RatingDeserializer(Serializer):
     def loads(self, stream):
         length = struct.unpack("!i", stream.read(4))[0]
         ba = stream.read(length)
-        res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
+        res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4)
         return int(res[0]), int(res[1]), res[2]
 
     def load_stream(self, stream):
@@ -243,12 +366,86 @@ class RatingDeserializer(Serializer):
             except EOFError:
                 return
 
+
 def _serialize_tuple(t):
     ba = bytearray(8)
     intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
     intpart[0], intpart[1] = t
     return ba
 
+
+# Vector math functions that support all of our vector types
+
+def _convert_vector(vec):
+    """
+    Convert a vector to a format we support internally. This does
+    the following:
+
+    * For dense NumPy vectors (ndarray), returns them as is
+    * For our SparseVector class, returns that as is
+    * For Python lists, converts them to NumPy vectors
+    * For scipy.sparse.*_matrix column vectors, converts them to
+      our own SparseVector type.
+
+    This should be called before passing any data to our algorithms
+    or attempting to serialize it to Java.
+    """
+    if type(vec) == ndarray or type(vec) == SparseVector:
+        return vec
+    elif type(vec) == list:
+        return array(vec, dtype=float64)
+    elif _have_scipy:
+        if _scipy_issparse(vec):
+            assert vec.shape[1] == 1, "Expected column vector"
+            csc = vec.tocsc()
+            return SparseVector(vec.shape[0], csc.indices, csc.data)
+    raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
+
+
+def _squared_distance(v1, v2):
+    """
+    Squared distance of two NumPy or sparse vectors.
+
+    >>> dense1 = array([1., 2.])
+    >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
+    >>> dense2 = array([2., 1.])
+    >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
+    >>> _squared_distance(dense1, dense2)
+    2.0
+    >>> _squared_distance(dense1, sparse2)
+    2.0
+    >>> _squared_distance(sparse1, dense2)
+    2.0
+    >>> _squared_distance(sparse1, sparse2)
+    2.0
+    """
+    v1 = _convert_vector(v1)
+    v2 = _convert_vector(v2)
+    if type(v1) == ndarray and type(v2) == ndarray:
+        diff = v1 - v2
+        return diff.dot(diff)
+    elif type(v1) == ndarray:
+        return v2.squared_distance(v1)
+    else:
+        return v1.squared_distance(v2)
+
+
+def _dot(vec, target):
+    """
+    Compute the dot product of a vector of the types we support
+    (Numpy array, list, SparseVector, or SciPy sparse) and a target
+    NumPy array that is either 1- or 2-dimensional. Equivalent to
+    calling numpy.dot of the two vectors, but for SciPy ones, we
+    have to transpose them because they're column vectors.
+    """
+    if type(vec) == ndarray or type(vec) == SparseVector:
+        return vec.dot(target)
+    elif type(vec) == list:
+        return _convert_vector(vec).dot(target)
+    else:
+        return vec.transpose().dot(target)[0]
+
+
 def _test():
     import doctest
     globs = globals().copy()
@@ -259,5 +456,6 @@ def _test():
     if failure_count:
         exit(-1)
 
+
 if __name__ == "__main__":
     _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb..3a23e08 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
 
 import numpy
 
-from numpy import array, dot, shape
+from numpy import array, shape
 from pyspark import SparkContext
 from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
     _serialize_double_matrix, _deserialize_double_matrix, \
     _serialize_double_vector, _deserialize_double_vector, \
     _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
-    LinearModel, _linear_predictor_typecheck
+    _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
 from math import exp, log
 
 class LogisticRegressionModel(LinearModel):
     """A linear binary classification model derived from logistic regression.
 
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0]),
+    ...     LabeledPoint(1.0, [1.0]),
+    ...     LabeledPoint(1.0, [2.0]),
+    ...     LabeledPoint(1.0, [3.0])
+    ... ]
     >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
     >>> lrm.predict(array([1.0])) > 0
     True
+    >>> lrm.predict(array([0.0])) <= 0
+    True
+    >>> sparse_data = [
+    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+    ... ]
+    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+    >>> lrm.predict(array([0.0, 1.0])) > 0
+    True
+    >>> lrm.predict(array([0.0, 0.0])) <= 0
+    True
+    >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+    True
+    >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+    True
     """
     def predict(self, x):
         _linear_predictor_typecheck(x, self._coeff)
-        margin = dot(x, self._coeff) + self._intercept
+        margin = _dot(x, self._coeff) + self._intercept
         prob = 1/(1 + exp(-margin))
         return 1 if prob > 0.5 else 0
 
+
 class LogisticRegressionWithSGD(object):
     @classmethod
     def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
 class SVMModel(LinearModel):
     """A support vector machine.
 
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0]),
+    ...     LabeledPoint(1.0, [1.0]),
+    ...     LabeledPoint(1.0, [2.0]),
+    ...     LabeledPoint(1.0, [3.0])
+    ... ]
     >>> svm = SVMWithSGD.train(sc.parallelize(data))
     >>> svm.predict(array([1.0])) > 0
     True
+    >>> sparse_data = [
+    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+    ... ]
+    >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+    >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+    True
+    >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+    True
     """
     def predict(self, x):
         _linear_predictor_typecheck(x, self._coeff)
-        margin = dot(x, self._coeff) + self._intercept
+        margin = _dot(x, self._coeff) + self._intercept
         return 1 if margin >= 0 else 0
 
 class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
     - pi: vector of logs of class priors (dimension C)
     - theta: matrix of logs of class conditional probabilities (CxD)
 
-    >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0, 0.0]),
+    ...     LabeledPoint(0.0, [0.0, 1.0]),
+    ...     LabeledPoint(1.0, [1.0, 0.0]),
+    ... ]
     >>> model = NaiveBayes.train(sc.parallelize(data))
     >>> model.predict(array([0.0, 1.0]))
     0.0
     >>> model.predict(array([1.0, 0.0]))
     1.0
+    >>> sparse_data = [
+    ...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+    ...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+    ...     LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+    ... ]
+    >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+    >>> model.predict(SparseVector(2, {1: 1.0}))
+    0.0
+    >>> model.predict(SparseVector(2, {0: 1.0}))
+    1.0
     """
 
     def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
 
     def predict(self, x):
         """Return the most likely class for a data vector x"""
-        return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+        return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
 
 class NaiveBayes(object):
     @classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
         @param lambda_: The smoothing parameter
         """
         sc = data.context
-        dataBytes = _get_unmangled_double_vector_rdd(data)
+        dataBytes = _get_unmangled_labeled_point_rdd(data)
         ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
         return NaiveBayesModel(
             _deserialize_double_vector(ans[0]),

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 3086291..f65088c 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -19,37 +19,61 @@ from numpy import array, dot
 from math import sqrt
 from pyspark import SparkContext
 from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
     _serialize_double_matrix, _deserialize_double_matrix, \
     _serialize_double_vector, _deserialize_double_vector, \
     _get_initial_weights, _serialize_rating, _regression_train_wrapper
+from pyspark.mllib.linalg import SparseVector
+
 
 class KMeansModel(object):
     """A clustering model derived from the k-means method.
 
     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
-    >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
-    >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+    >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
+    >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
+    True
+    >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
+    True
+    >>> model = KMeans.train(sc.parallelize(data), 2)
+    >>> sparse_data = [
+    ...     SparseVector(3, {1: 1.0}),
+    ...     SparseVector(3, {1: 1.1}),
+    ...     SparseVector(3, {2: 1.0}),
+    ...     SparseVector(3, {2: 1.1})
+    ... ]
+    >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
+    >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
+    True
+    >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
+    True
+    >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
     True
-    >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+    >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
     True
-    >>> clusters = KMeans.train(sc.parallelize(data), 2)
+    >>> type(model.clusterCenters)
+    <type 'list'>
     """
-    def __init__(self, centers_):
-        self.centers = centers_
+    def __init__(self, centers):
+        self.centers = centers
+
+    @property
+    def clusterCenters(self):
+        """Get the cluster centers, represented as a list of NumPy arrays."""
+        return self.centers
 
     def predict(self, x):
         """Find the cluster to which x belongs in this model."""
         best = 0
-        best_distance = 1e75
-        for i in range(0, self.centers.shape[0]):
-            diff = x - self.centers[i]
-            distance = sqrt(dot(diff, diff))
+        best_distance = float("inf")
+        for i in range(0, len(self.centers)):
+            distance = _squared_distance(x, self.centers[i])
             if distance < best_distance:
                 best = i
                 best_distance = distance
         return best
 
+
 class KMeans(object):
     @classmethod
     def train(cls, data, k, maxIterations=100, runs=1,
@@ -64,7 +88,9 @@ class KMeans(object):
         elif type(ans[0]) != bytearray:
             raise RuntimeError("JVM call result had first element of type "
                     + type(ans[0]) + " which is not bytearray")
-        return KMeansModel(_deserialize_double_matrix(ans[0]))
+        matrix = _deserialize_double_matrix(ans[0])
+        return KMeansModel([row for row in matrix])
+
 
 def _test():
     import doctest
@@ -76,5 +102,6 @@ def _test():
     if failure_count:
         exit(-1)
 
+
 if __name__ == "__main__":
     _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
new file mode 100644
index 0000000..0aa3a51
--- /dev/null
+++ b/python/pyspark/mllib/linalg.py
@@ -0,0 +1,245 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+MLlib utilities for linear algebra. For dense vectors, MLlib
+uses the NumPy C{array} type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a L{SparseVector}
+object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+SciPy is available in their environment.
+"""
+
+from numpy import array, array_equal, ndarray, float64, int32
+
+
+class SparseVector(object):
+    """
+    A simple sparse vector class for passing data to MLlib. Users may
+    alternatively pass SciPy's {scipy.sparse} data types.
+    """
+
+    def __init__(self, size, *args):
+        """
+        Create a sparse vector, using either a dictionary, a list of
+        (index, value) pairs, or two separate arrays of indices and
+        values (sorted by index).
+
+        @param size: Size of the vector.
+        @param args: Non-zero entries, as a dictionary, list of tupes,
+               or two sorted lists containing indices and values.
+
+        >>> print SparseVector(4, {1: 1.0, 3: 5.5})
+        [1: 1.0, 3: 5.5]
+        >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
+        [1: 1.0, 3: 5.5]
+        >>> print SparseVector(4, [1, 3], [1.0, 5.5])
+        [1: 1.0, 3: 5.5]
+        """
+        assert type(size) == int, "first argument must be an int"
+        self.size = size
+        assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
+        if len(args) == 1:
+            pairs = args[0]
+            if type(pairs) == dict:
+               pairs = pairs.items()
+            pairs = sorted(pairs)
+            self.indices = array([p[0] for p in pairs], dtype=int32)
+            self.values = array([p[1] for p in pairs], dtype=float64)
+        else:
+            assert len(args[0]) == len(args[1]), "index and value arrays not same length"
+            self.indices = array(args[0], dtype=int32)
+            self.values = array(args[1], dtype=float64)
+            for i in xrange(len(self.indices) - 1):
+                if self.indices[i] >= self.indices[i + 1]:
+                    raise TypeError("indices array must be sorted")
+
+    def dot(self, other):
+        """
+        Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
+
+        >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+        >>> a.dot(a)
+        25.0
+        >>> a.dot(array([1., 2., 3., 4.]))
+        22.0
+        >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+        >>> a.dot(b)
+        0.0
+        >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+        array([ 22.,  22.])
+        """
+        if type(other) == ndarray:
+            if other.ndim == 1:
+                result = 0.0
+                for i in xrange(len(self.indices)):
+                    result += self.values[i] * other[self.indices[i]]
+                return result
+            elif other.ndim == 2:
+                results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
+                return array(results)
+            else:
+                raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
+        else:
+            result = 0.0
+            i, j = 0, 0
+            while i < len(self.indices) and j < len(other.indices):
+                if self.indices[i] == other.indices[j]:
+                    result += self.values[i] * other.values[j]
+                    i += 1
+                    j += 1
+                elif self.indices[i] < other.indices[j]:
+                    i += 1
+                else:
+                    j += 1
+            return result
+
+    def squared_distance(self, other):
+        """
+        Squared distance from a SparseVector or 1-dimensional NumPy array.
+
+        >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+        >>> a.squared_distance(a)
+        0.0
+        >>> a.squared_distance(array([1., 2., 3., 4.]))
+        11.0
+        >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+        >>> a.squared_distance(b)
+        30.0
+        >>> b.squared_distance(a)
+        30.0
+        """
+        if type(other) == ndarray:
+            if other.ndim == 1:
+                result = 0.0
+                j = 0   # index into our own array
+                for i in xrange(other.shape[0]):
+                    if j < len(self.indices) and self.indices[j] == i:
+                        diff = self.values[j] - other[i]
+                        result += diff * diff
+                        j += 1
+                    else:
+                        result += other[i] * other[i]
+                return result
+            else:
+                raise Exception("Cannot call squared_distance with %d-dimensional array" %
+                        other.ndim)
+        else:
+            result = 0.0
+            i, j = 0, 0
+            while i < len(self.indices) and j < len(other.indices):
+                if self.indices[i] == other.indices[j]:
+                    diff = self.values[i] - other.values[j]
+                    result += diff * diff
+                    i += 1
+                    j += 1
+                elif self.indices[i] < other.indices[j]:
+                    result += self.values[i] * self.values[i]
+                    i += 1
+                else:
+                    result += other.values[j] * other.values[j]
+                    j += 1
+            while i < len(self.indices):
+                result += self.values[i] * self.values[i]
+                i += 1
+            while j < len(other.indices):
+                result += other.values[j] * other.values[j]
+                j += 1
+            return result
+
+    def __str__(self):
+        inds = self.indices
+        vals = self.values
+        entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+        return "[" + entries + "]"
+
+    def __repr__(self):
+        inds = self.indices
+        vals = self.values
+        entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+        return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+    def __eq__(self, other):
+        """
+        Test SparseVectors for equality.
+
+        >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+        >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+        >>> v1 == v2
+        True
+        >>> v1 != v2
+        False
+        """
+
+        return (isinstance(other, self.__class__)
+            and other.size == self.size
+            and array_equal(other.indices, self.indices)
+            and array_equal(other.values, self.values))
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+
+
+class Vectors(object):
+    """
+    Factory methods for working with vectors. Note that dense vectors
+    are simply represented as NumPy array objects, so there is no need
+    to covert them for use in MLlib. For sparse vectors, the factory
+    methods in this class create an MLlib-compatible type, or users
+    can pass in SciPy's C{scipy.sparse} column vectors.
+    """
+
+    @staticmethod
+    def sparse(size, *args):
+        """
+        Create a sparse vector, using either a dictionary, a list of
+        (index, value) pairs, or two separate arrays of indices and
+        values (sorted by index).
+
+        @param size: Size of the vector.
+        @param args: Non-zero entries, as a dictionary, list of tupes,
+                     or two sorted lists containing indices and values.
+
+        >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
+        [1: 1.0, 3: 5.5]
+        >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+        [1: 1.0, 3: 5.5]
+        >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
+        [1: 1.0, 3: 5.5]
+        """
+        return SparseVector(size, *args)
+
+    @staticmethod
+    def dense(elements):
+        """
+        Create a dense vector of 64-bit floats from a Python list. Always
+        returns a NumPy array.
+
+        >>> Vectors.dense([1, 2, 3])
+        array([ 1.,  2.,  3.])
+        """
+        return array(elements, dtype=float64)
+
+
+def _test():
+    import doctest
+    (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/63ca581d/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db0..266b31d 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
 # limitations under the License.
 #
 
-from numpy import array, dot
+from numpy import array, ndarray
 from pyspark import SparkContext
 from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
     _serialize_double_matrix, _deserialize_double_matrix, \
     _serialize_double_vector, _deserialize_double_vector, \
     _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
-    _linear_predictor_typecheck
+    _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+    """
+    The features and labels of a data point.
+
+    @param label: Label for this data point.
+    @param features: Vector of features for this point (NumPy array, list,
+        pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+    """
+    def __init__(self, label, features):
+        self.label = label
+        if (type(features) == ndarray or type(features) == SparseVector
+                or (_have_scipy and _scipy_issparse(features))):
+            self.features = features
+        elif type(features) == list:
+            self.features = array(features)
+        else:
+            raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
 
 class LinearModel(object):
-    """Something that has a vector of coefficients and an intercept."""
-    def __init__(self, coeff, intercept):
-        self._coeff = coeff
+    """A linear model that has a vector of coefficients and an intercept."""
+    def __init__(self, weights, intercept):
+        self._coeff = weights
         self._intercept = intercept
 
+    @property
+    def weights(self):
+        return self._coeff
+
+    @property
+    def intercept(self):
+        return self._intercept
+
+
 class LinearRegressionModelBase(LinearModel):
     """A linear regression model.
 
     >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
     >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
     True
+    >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+    True
     """
     def predict(self, x):
         """Predict the value of the dependent variable given a vector x"""
         """containing values for the independent variables."""
         _linear_predictor_typecheck(x, self._coeff)
-        return dot(self._coeff, x) + self._intercept
+        return _dot(x, self._coeff) + self._intercept
+
 
 class LinearRegressionModel(LinearRegressionModelBase):
     """A linear regression model derived from a least-squares fit.
 
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> from pyspark.mllib.regression import LabeledPoint
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0]),
+    ...     LabeledPoint(1.0, [1.0]),
+    ...     LabeledPoint(3.0, [2.0]),
+    ...     LabeledPoint(2.0, [3.0])
+    ... ]
     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
+    >>> data = [
+    ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+    ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+    ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+    ... ]
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
     """
 
+
 class LinearRegressionWithSGD(object):
     @classmethod
     def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
                         d._jrdd, iterations, step, miniBatchFraction, i),
                 LinearRegressionModel, data, initialWeights)
 
+
 class LassoModel(LinearRegressionModelBase):
     """A linear regression model derived from a least-squares fit with an
     l_1 penalty term.
 
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> from pyspark.mllib.regression import LabeledPoint
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0]),
+    ...     LabeledPoint(1.0, [1.0]),
+    ...     LabeledPoint(3.0, [2.0]),
+    ...     LabeledPoint(2.0, [3.0])
+    ... ]
     >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
+    >>> data = [
+    ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+    ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+    ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+    ... ]
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
     """
 
+
 class LassoWithSGD(object):
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
                         iterations, step, regParam, miniBatchFraction, i),
                 LassoModel, data, initialWeights)
 
+
 class RidgeRegressionModel(LinearRegressionModelBase):
     """A linear regression model derived from a least-squares fit with an
     l_2 penalty term.
 
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> from pyspark.mllib.regression import LabeledPoint
+    >>> data = [
+    ...     LabeledPoint(0.0, [0.0]),
+    ...     LabeledPoint(1.0, [1.0]),
+    ...     LabeledPoint(3.0, [2.0]),
+    ...     LabeledPoint(2.0, [3.0])
+    ... ]
     >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
+    >>> data = [
+    ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+    ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+    ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+    ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+    ... ]
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    True
+    >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+    True
     """
 
+
 class RidgeRegressionWithSGD(object):
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
                         iterations, step, regParam, miniBatchFraction, i),
                 RidgeRegressionModel, data, initialWeights)
 
+
 def _test():
     import doctest
     globs = globals().copy()