You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/11/11 07:26:20 UTC

spark git commit: [SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API

Repository: spark
Updated Branches:
  refs/heads/master 3c07b8f08 -> 65083e93d


[SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API

This PR check all of the existing Python MLlib API to make sure that numpy.array is supported as Vector (also RDD of numpy.array).

It also improve some docstring and doctest.

cc mateiz mengxr

Author: Davies Liu <da...@databricks.com>

Closes #3189 from davies/numpy and squashes the following commits:

d5057c4 [Davies Liu] fix tests
6987611 [Davies Liu] support numpy.array for all MLlib API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65083e93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65083e93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65083e93

Branch: refs/heads/master
Commit: 65083e93ddd552b7d3e4eb09f87c091ef2ae83a2
Parents: 3c07b8f
Author: Davies Liu <da...@databricks.com>
Authored: Mon Nov 10 22:26:16 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Nov 10 22:26:16 2014 -0800

----------------------------------------------------------------------
 python/pyspark/mllib/classification.py | 13 +++++----
 python/pyspark/mllib/feature.py        | 31 +++++++++++++++-----
 python/pyspark/mllib/random.py         | 45 +++++++++++++++++++++++++++--
 python/pyspark/mllib/recommendation.py |  6 ++--
 python/pyspark/mllib/regression.py     | 15 ++++++----
 python/pyspark/mllib/stat.py           | 16 +++++++++-
 python/pyspark/mllib/util.py           | 11 ++-----
 7 files changed, 105 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 297a2bf..5d90ddd 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel):
     """
 
     def predict(self, x):
+        x = _convert_to_vector(x)
         margin = self.weights.dot(x) + self._intercept
         if margin > 0:
             prob = 1 / (1 + exp(-margin))
@@ -79,7 +80,7 @@ class LogisticRegressionWithSGD(object):
         """
         Train a logistic regression model on the given data.
 
-        :param data:              The training data.
+        :param data:              The training data, an RDD of LabeledPoint.
         :param iterations:        The number of iterations (default: 100).
         :param step:              The step parameter used in SGD
                                   (default: 1.0).
@@ -136,6 +137,7 @@ class SVMModel(LinearModel):
     """
 
     def predict(self, x):
+        x = _convert_to_vector(x)
         margin = self.weights.dot(x) + self.intercept
         return 1 if margin >= 0 else 0
 
@@ -148,7 +150,7 @@ class SVMWithSGD(object):
         """
         Train a support vector machine on the given data.
 
-        :param data:              The training data.
+        :param data:              The training data, an RDD of LabeledPoint.
         :param iterations:        The number of iterations (default: 100).
         :param step:              The step parameter used in SGD
                                   (default: 1.0).
@@ -233,11 +235,12 @@ class NaiveBayes(object):
         classification.  By making every vector a 0-1 vector, it can also be
         used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
 
-        :param data: RDD of NumPy vectors, one per element, where the first
-               coordinate is the label and the rest is the feature vector
-               (e.g. a count vector).
+        :param data: RDD of LabeledPoint.
         :param lambda_: The smoothing parameter
         """
+        first = data.first()
+        if not isinstance(first, LabeledPoint):
+            raise ValueError("`data` should be an RDD of LabeledPoint")
         labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_)
         return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index 44bf6f2..9ec2807 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError
 
 from pyspark import RDD, SparkContext
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.linalg import Vectors, _convert_to_vector
 
 __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
            'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
@@ -81,12 +81,16 @@ class Normalizer(VectorTransformer):
         """
         Applies unit length normalization on a vector.
 
-        :param vector: vector to be normalized.
+        :param vector: vector or RDD of vector to be normalized.
         :return: normalized vector. If the norm of the input is zero, it
                 will return the input vector.
         """
         sc = SparkContext._active_spark_context
         assert sc is not None, "SparkContext should be initialized first"
+        if isinstance(vector, RDD):
+            vector = vector.map(_convert_to_vector)
+        else:
+            vector = _convert_to_vector(vector)
         return callMLlibFunc("normalizeVector", self.p, vector)
 
 
@@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
     Wrapper for the model in JVM
     """
 
-    def transform(self, dataset):
-        return self.call("transform", dataset)
+    def transform(self, vector):
+        if isinstance(vector, RDD):
+            vector = vector.map(_convert_to_vector)
+        else:
+            vector = _convert_to_vector(vector)
+        return self.call("transform", vector)
 
 
 class StandardScalerModel(JavaVectorTransformer):
@@ -109,7 +117,7 @@ class StandardScalerModel(JavaVectorTransformer):
         """
         Applies standardization transformation on a vector.
 
-        :param vector: Vector to be standardized.
+        :param vector: Vector or RDD of Vector to be standardized.
         :return: Standardized vector. If the variance of a column is zero,
                 it will return default `0.0` for the column with zero variance.
         """
@@ -154,6 +162,7 @@ class StandardScaler(object):
                     the transformation model.
         :return: a StandardScalarModel
         """
+        dataset = dataset.map(_convert_to_vector)
         jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
         return StandardScalerModel(jmodel)
 
@@ -211,6 +220,8 @@ class IDFModel(JavaVectorTransformer):
         :param dataset: an RDD of term frequency vectors
         :return: an RDD of TF-IDF vectors
         """
+        if not isinstance(dataset, RDD):
+            raise TypeError("dataset should be an RDD of term frequency vectors")
         return JavaVectorTransformer.transform(self, dataset)
 
 
@@ -255,7 +266,9 @@ class IDF(object):
 
         :param dataset: an RDD of term frequency vectors
         """
-        jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset)
+        if not isinstance(dataset, RDD):
+            raise TypeError("dataset should be an RDD of term frequency vectors")
+        jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
         return IDFModel(jmodel)
 
 
@@ -287,6 +300,8 @@ class Word2VecModel(JavaVectorTransformer):
 
         Note: local use only
         """
+        if not isinstance(word, basestring):
+            word = _convert_to_vector(word)
         words, similarity = self.call("findSynonyms", word, num)
         return zip(words, similarity)
 
@@ -374,9 +389,11 @@ class Word2Vec(object):
         """
         Computes the vector representation of each word in vocabulary.
 
-        :param data: training data. RDD of subtype of Iterable[String]
+        :param data: training data. RDD of list of string
         :return: Word2VecModel instance
         """
+        if not isinstance(data, RDD):
+            raise TypeError("data should be an RDD of list of string")
         jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
                                float(self.learningRate), int(self.numPartitions),
                                int(self.numIterations), long(self.seed))

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/random.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 7eebfc6..cb4304f 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -52,6 +52,12 @@ class RandomRDDs(object):
         C{RandomRDDs.uniformRDD(sc, n, p, seed)\
           .map(lambda v: a + (b - a) * v)}
 
+        :param sc: SparkContext used to create the RDD.
+        :param size: Size of the RDD.
+        :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+        :param seed: Random seed (default: a random long integer).
+        :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
+
         >>> x = RandomRDDs.uniformRDD(sc, 100).collect()
         >>> len(x)
         100
@@ -76,6 +82,12 @@ class RandomRDDs(object):
         C{RandomRDDs.normal(sc, n, p, seed)\
           .map(lambda v: mean + sigma * v)}
 
+        :param sc: SparkContext used to create the RDD.
+        :param size: Size of the RDD.
+        :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+        :param seed: Random seed (default: a random long integer).
+        :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
+
         >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
         >>> stats = x.stats()
         >>> stats.count()
@@ -93,6 +105,13 @@ class RandomRDDs(object):
         Generates an RDD comprised of i.i.d. samples from the Poisson
         distribution with the input mean.
 
+        :param sc: SparkContext used to create the RDD.
+        :param mean: Mean, or lambda, for the Poisson distribution.
+        :param size: Size of the RDD.
+        :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+        :param seed: Random seed (default: a random long integer).
+        :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
+
         >>> mean = 100.0
         >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
         >>> stats = x.stats()
@@ -104,7 +123,7 @@ class RandomRDDs(object):
         >>> abs(stats.stdev() - sqrt(mean)) < 0.5
         True
         """
-        return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed)
+        return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
 
     @staticmethod
     @toArray
@@ -113,6 +132,13 @@ class RandomRDDs(object):
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
         from the uniform distribution U(0.0, 1.0).
 
+        :param sc: SparkContext used to create the RDD.
+        :param numRows: Number of Vectors in the RDD.
+        :param numCols: Number of elements in each Vector.
+        :param numPartitions: Number of partitions in the RDD.
+        :param seed: Seed for the RNG that generates the seed for the generator in each partition.
+        :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
+
         >>> import numpy as np
         >>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
         >>> mat.shape
@@ -131,6 +157,13 @@ class RandomRDDs(object):
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
         from the standard normal distribution.
 
+        :param sc: SparkContext used to create the RDD.
+        :param numRows: Number of Vectors in the RDD.
+        :param numCols: Number of elements in each Vector.
+        :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+        :param seed: Random seed (default: a random long integer).
+        :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+
         >>> import numpy as np
         >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
         >>> mat.shape
@@ -149,6 +182,14 @@ class RandomRDDs(object):
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
         from the Poisson distribution with the input mean.
 
+        :param sc: SparkContext used to create the RDD.
+        :param mean: Mean, or lambda, for the Poisson distribution.
+        :param numRows: Number of Vectors in the RDD.
+        :param numCols: Number of elements in each Vector.
+        :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
+        :param seed: Random seed (default: a random long integer).
+        :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
+
         >>> import numpy as np
         >>> mean = 100.0
         >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
@@ -161,7 +202,7 @@ class RandomRDDs(object):
         >>> abs(mat.std() - sqrt(mean)) < 0.5
         True
         """
-        return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols,
+        return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
                              numPartitions, seed)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index e26b152..41bbd9a 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -32,7 +32,7 @@ class Rating(object):
         return Rating, (self.user, self.product, self.rating)
 
     def __repr__(self):
-        return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+        return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating)
 
 
 class MatrixFactorizationModel(JavaModelWrapper):
@@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
     >>> testset = sc.parallelize([(1, 2), (1, 1)])
     >>> model = ALS.train(ratings, 1, seed=10)
     >>> model.predictAll(testset).collect()
-    [Rating(1, 1, 1), Rating(1, 2, 1)]
+    [Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)]
 
     >>> model = ALS.train(ratings, 4, seed=10)
     >>> model.userFeatures().collect()
@@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
     0.4473...
     """
     def predict(self, user, product):
-        return self._java_model.predict(user, product)
+        return self._java_model.predict(int(user), int(product))
 
     def predictAll(self, user_product):
         assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 43c1a2f..66e25a4 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -36,7 +36,7 @@ class LabeledPoint(object):
     """
 
     def __init__(self, label, features):
-        self.label = label
+        self.label = float(label)
         self.features = _convert_to_vector(features)
 
     def __reduce__(self):
@@ -46,7 +46,7 @@ class LabeledPoint(object):
         return "(" + ",".join((str(self.label), str(self.features))) + ")"
 
     def __repr__(self):
-        return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
+        return "LabeledPoint(%s, %s)" % (self.label, self.features)
 
 
 class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
 
     def __init__(self, weights, intercept):
         self._coeff = _convert_to_vector(weights)
-        self._intercept = intercept
+        self._intercept = float(intercept)
 
     @property
     def weights(self):
@@ -66,7 +66,7 @@ class LinearModel(object):
         return self._intercept
 
     def __repr__(self):
-        return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept)
+        return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
 
 
 class LinearRegressionModelBase(LinearModel):
@@ -85,6 +85,7 @@ class LinearRegressionModelBase(LinearModel):
         Predict the value of the dependent variable given a vector x
         containing values for the independent variables.
         """
+        x = _convert_to_vector(x)
         return self.weights.dot(x) + self.intercept
 
 
@@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase):
 # return the result of a call to the appropriate JVM stub.
 # _regression_train_wrapper is responsible for setup and error checking.
 def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
+    first = data.first()
+    if not isinstance(first, LabeledPoint):
+        raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
     initial_weights = initial_weights or [0.0] * len(data.first().features)
     weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
                                     _convert_to_vector(initial_weights))
@@ -264,7 +268,8 @@ class RidgeRegressionWithSGD(object):
 def _test():
     import doctest
     from pyspark import SparkContext
-    globs = globals().copy()
+    import pyspark.mllib.regression
+    globs = pyspark.mllib.regression.__dict__.copy()
     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 0700f8a..1980f5b 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
 from pyspark import RDD
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
 from pyspark.mllib.linalg import Matrix, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
 
 
 __all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']
@@ -107,6 +108,11 @@ class Statistics(object):
         """
         Computes column-wise summary statistics for the input RDD[Vector].
 
+        :param rdd: an RDD[Vector] for which column-wise summary statistics
+                    are to be computed.
+        :return: :class:`MultivariateStatisticalSummary` object containing
+                 column-wise summary statistics.
+
         >>> from pyspark.mllib.linalg import Vectors
         >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
         ...                       Vectors.dense([4, 5, 0,  3]),
@@ -140,6 +146,13 @@ class Statistics(object):
         to specify the method to be used for single RDD inout.
         If two RDDs of floats are passed in, a single float is returned.
 
+        :param x: an RDD of vector for which the correlation matrix is to be computed,
+                  or an RDD of float of the same cardinality as y when y is specified.
+        :param y: an RDD of float of the same cardinality as x.
+        :param method: String specifying the method to use for computing correlation.
+                       Supported: `pearson` (default), `spearman`
+        :return: Correlation matrix comparing columns in x.
+
         >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
         >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
         >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
@@ -242,7 +255,6 @@ class Statistics(object):
         >>> print round(chi.statistic, 4)
         21.9958
 
-        >>> from pyspark.mllib.regression import LabeledPoint
         >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
         ...         LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
         ...         LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
@@ -257,6 +269,8 @@ class Statistics(object):
         1.5
         """
         if isinstance(observed, RDD):
+            if not isinstance(observed.first(), LabeledPoint):
+                raise ValueError("observed should be an RDD of LabeledPoint")
             jmodels = callMLlibFunc("chiSqTest", observed)
             return [ChiSqTestResult(m) for m in jmodels]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/65083e93/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 96aef8f..4ed978b 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -161,15 +161,8 @@ class MLUtils(object):
         >>> tempFile = NamedTemporaryFile(delete=True)
         >>> tempFile.close()
         >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
-        >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
-        >>> type(loaded[0]) == LabeledPoint
-        True
-        >>> print examples[0]
-        (1.1,(3,[0,2],[-1.23,4.56e-07]))
-        >>> type(examples[1]) == LabeledPoint
-        True
-        >>> print examples[1]
-        (0.0,[1.01,2.02,3.03])
+        >>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+        [LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
         """
         minPartitions = minPartitions or min(sc.defaultParallelism, 2)
         return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org