You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ml...@apache.org on 2016/04/27 19:48:14 UTC

spark git commit: [SPARK-9656][MLLIB][PYTHON] Add missing methods to PySpark's Distributed Linear Algebra Classes

Repository: spark
Updated Branches:
  refs/heads/master a234cc614 -> 607f50341


[SPARK-9656][MLLIB][PYTHON] Add missing methods to PySpark's Distributed Linear Algebra Classes

This PR adds the remaining group of methods to PySpark's distributed linear algebra classes as follows:

* `RowMatrix` <sup>**[1]**</sup>
  1. `computeGramianMatrix`
  2. `computeCovariance`
  3. `computeColumnSummaryStatistics`
  4. `columnSimilarities`
  5. `tallSkinnyQR` <sup>**[2]**</sup>
* `IndexedRowMatrix` <sup>**[3]**</sup>
  1. `computeGramianMatrix`
* `CoordinateMatrix`
  1. `transpose`
* `BlockMatrix`
  1. `validate`
  2. `cache`
  3. `persist`
  4. `transpose`

**[1]**: Note: `multiply`, `computeSVD`, and `computePrincipalComponents` are already part of PR #7963 for SPARK-6227.
**[2]**: Implementing `tallSkinnyQR` uncovered a bug with our PySpark `RowMatrix` constructor.  As discussed on the dev list [here](http://apache-spark-developers-list.1001551.n3.nabble.com/K-Means-And-Class-Tags-td10038.html), there appears to be an issue with type erasure with RDDs coming from Java, and by extension from PySpark.  Although we are attempting to construct a `RowMatrix` from an `RDD[Vector]` in [PythonMLlibAPI](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1115), the `Vector` type is erased, resulting in an `RDD[Object]`.  Thus, when calling Scala's `tallSkinnyQR` from PySpark, we get a Java `ClassCastException` in which an `Object` cannot be cast to a Spark `Vector`.  As noted in the aforementioned dev list thread, this issue was also encountered with `DecisionTrees`, and the fix involved an explicit `retag` of the RDD with a `Vector` type.  Thus, this PR currently contains that fix applied to
  the `createRowMatrix` helper function in `PythonMLlibAPI`.  `IndexedRowMatrix` and `CoordinateMatrix` do not appear to have this issue likely due to their related helper functions in `PythonMLlibAPI` creating the RDDs explicitly from DataFrames with pattern matching, thus preserving the types.  However, this fix may be out of scope for this single PR, and it may be better suited in a separate JIRA/PR.  Therefore, I have marked this PR as WIP and am open to discussion.
**[3]**: Note: `multiply` and `computeSVD` are already part of PR #7963 for SPARK-6227.

Author: Mike Dusenberry <mw...@us.ibm.com>

Closes #9441 from dusenberrymw/SPARK-9656_Add_Missing_Methods_to_PySpark_Distributed_Linear_Algebra.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607f5034
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607f5034
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607f5034

Branch: refs/heads/master
Commit: 607f50341c8d86f0034f3aae69a55f25d55a012e
Parents: a234cc6
Author: Mike Dusenberry <mw...@us.ibm.com>
Authored: Wed Apr 27 19:48:05 2016 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Wed Apr 27 19:48:05 2016 +0200

----------------------------------------------------------------------
 .../linalg/distributed/IndexedRowMatrix.scala   |   3 +-
 python/pyspark/mllib/linalg/__init__.py         |  32 ++-
 python/pyspark/mllib/linalg/distributed.py      | 270 ++++++++++++++++++-
 3 files changed, 301 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/607f5034/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index 06b9c4a..b03b3ec 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -188,7 +188,8 @@ class IndexedRowMatrix @Since("1.0.0") (
   }
 
   /**
-   * Computes the Gramian matrix `A^T A`.
+   * Computes the Gramian matrix `A^T A`. Note that this cannot be
+   * computed on matrices with more than 65535 columns.
    */
   @Since("1.0.0")
   def computeGramianMatrix(): Matrix = {

http://git-wip-us.apache.org/repos/asf/spark/blob/607f5034/python/pyspark/mllib/linalg/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index 4cd7306..70509a6 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -38,12 +38,14 @@ else:
 
 import numpy as np
 
+from pyspark import since
 from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
     IntegerType, ByteType, BooleanType
 
 
 __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors',
-           'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices']
+           'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices',
+           'QRDecomposition']
 
 
 if sys.version_info[:2] == (2, 7):
@@ -1235,6 +1237,34 @@ class Matrices(object):
         return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
 
 
+class QRDecomposition(object):
+    """
+    .. note:: Experimental
+
+    Represents QR factors.
+    """
+    def __init__(self, Q, R):
+        self._Q = Q
+        self._R = R
+
+    @property
+    @since('2.0.0')
+    def Q(self):
+        """
+        An orthogonal matrix Q in a QR decomposition.
+        May be null if not computed.
+        """
+        return self._Q
+
+    @property
+    @since('2.0.0')
+    def R(self):
+        """
+        An upper triangular matrix R in a QR decomposition.
+        """
+        return self._R
+
+
 def _test():
     import doctest
     (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)

http://git-wip-us.apache.org/repos/asf/spark/blob/607f5034/python/pyspark/mllib/linalg/distributed.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index 43cb0be..af34ce3 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -26,9 +26,11 @@ if sys.version >= '3':
 
 from py4j.java_gateway import JavaObject
 
-from pyspark import RDD
+from pyspark import RDD, since
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector, Matrix
+from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
+from pyspark.mllib.stat import MultivariateStatisticalSummary
+from pyspark.storagelevel import StorageLevel
 
 
 __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
@@ -151,6 +153,156 @@ class RowMatrix(DistributedMatrix):
         """
         return self._java_matrix_wrapper.call("numCols")
 
+    @since('2.0.0')
+    def computeColumnSummaryStatistics(self):
+        """
+        Computes column-wise summary statistics.
+
+        :return: :class:`MultivariateStatisticalSummary` object
+                 containing column-wise summary statistics.
+
+        >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+        >>> mat = RowMatrix(rows)
+
+        >>> colStats = mat.computeColumnSummaryStatistics()
+        >>> colStats.mean()
+        array([ 2.5,  3.5,  4.5])
+        """
+        java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics")
+        return MultivariateStatisticalSummary(java_col_stats)
+
+    @since('2.0.0')
+    def computeCovariance(self):
+        """
+        Computes the covariance matrix, treating each row as an
+        observation. Note that this cannot be computed on matrices
+        with more than 65535 columns.
+
+        >>> rows = sc.parallelize([[1, 2], [2, 1]])
+        >>> mat = RowMatrix(rows)
+
+        >>> mat.computeCovariance()
+        DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
+        """
+        return self._java_matrix_wrapper.call("computeCovariance")
+
+    @since('2.0.0')
+    def computeGramianMatrix(self):
+        """
+        Computes the Gramian matrix `A^T A`. Note that this cannot be
+        computed on matrices with more than 65535 columns.
+
+        >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+        >>> mat = RowMatrix(rows)
+
+        >>> mat.computeGramianMatrix()
+        DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+        """
+        return self._java_matrix_wrapper.call("computeGramianMatrix")
+
+    @since('2.0.0')
+    def columnSimilarities(self, threshold=0.0):
+        """
+        Compute similarities between columns of this matrix.
+
+        The threshold parameter is a trade-off knob between estimate
+        quality and computational cost.
+
+        The default threshold setting of 0 guarantees deterministically
+        correct results, but uses the brute-force approach of computing
+        normalized dot products.
+
+        Setting the threshold to positive values uses a sampling
+        approach and incurs strictly less computational cost than the
+        brute-force approach. However the similarities computed will
+        be estimates.
+
+        The sampling guarantees relative-error correctness for those
+        pairs of columns that have similarity greater than the given
+        similarity threshold.
+
+        To describe the guarantee, we set some notation:
+            * Let A be the smallest in magnitude non-zero element of
+              this matrix.
+            * Let B be the largest in magnitude non-zero element of
+              this matrix.
+            * Let L be the maximum number of non-zeros per row.
+
+        For example, for {0,1} matrices: A=B=1.
+        Another example, for the Netflix matrix: A=1, B=5
+
+        For those column pairs that are above the threshold, the
+        computed similarity is correct to within 20% relative error
+        with probability at least 1 - (0.981)^10/B^
+
+        The shuffle size is bounded by the *smaller* of the following
+        two expressions:
+
+            * O(n log(n) L / (threshold * A))
+            * O(m L^2^)
+
+        The latter is the cost of the brute-force approach, so for
+        non-zero thresholds, the cost is always cheaper than the
+        brute-force approach.
+
+        :param: threshold: Set to 0 for deterministic guaranteed
+                           correctness. Similarities above this
+                           threshold are estimated with the cost vs
+                           estimate quality trade-off described above.
+        :return: An n x n sparse upper-triangular CoordinateMatrix of
+                 cosine similarities between columns of this matrix.
+
+        >>> rows = sc.parallelize([[1, 2], [1, 5]])
+        >>> mat = RowMatrix(rows)
+
+        >>> sims = mat.columnSimilarities()
+        >>> sims.entries.first().value
+        0.91914503...
+        """
+        java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold))
+        return CoordinateMatrix(java_sims_mat)
+
+    @since('2.0.0')
+    def tallSkinnyQR(self, computeQ=False):
+        """
+        Compute the QR decomposition of this RowMatrix.
+
+        The implementation is designed to optimize the QR decomposition
+        (factorization) for the RowMatrix of a tall and skinny shape.
+
+        Reference:
+         Paul G. Constantine, David F. Gleich. "Tall and skinny QR
+         factorizations in MapReduce architectures"
+         ([[http://dx.doi.org/10.1145/1996092.1996103]])
+
+        :param: computeQ: whether to computeQ
+        :return: QRDecomposition(Q: RowMatrix, R: Matrix), where
+                 Q = None if computeQ = false.
+
+        >>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
+        >>> mat = RowMatrix(rows)
+        >>> decomp = mat.tallSkinnyQR(True)
+        >>> Q = decomp.Q
+        >>> R = decomp.R
+
+        >>> # Test with absolute values
+        >>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
+        >>> absQRows.collect()
+        [[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]
+
+        >>> # Test with absolute values
+        >>> abs(R.toArray()).tolist()
+        [[5.0, 10.0], [0.0, 1.0]]
+        """
+        decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ))
+        if computeQ:
+            java_Q = decomp.call("Q")
+            Q = RowMatrix(java_Q)
+        else:
+            Q = None
+        R = decomp.call("R")
+        return QRDecomposition(Q, R)
+
 
 class IndexedRow(object):
     """
@@ -311,6 +463,21 @@ class IndexedRowMatrix(DistributedMatrix):
         java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
         return CoordinateMatrix(java_coordinate_matrix)
 
+    @since('2.0.0')
+    def computeGramianMatrix(self):
+        """
+        Computes the Gramian matrix `A^T A`. Note that this cannot be
+        computed on matrices with more than 65535 columns.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(1, [4, 5, 6])])
+        >>> mat = IndexedRowMatrix(rows)
+
+        >>> mat.computeGramianMatrix()
+        DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+        """
+        return self._java_matrix_wrapper.call("computeGramianMatrix")
+
     def toRowMatrix(self):
         """
         Convert this matrix to a RowMatrix.
@@ -514,6 +681,26 @@ class CoordinateMatrix(DistributedMatrix):
         """
         return self._java_matrix_wrapper.call("numCols")
 
+    @since('2.0.0')
+    def transpose(self):
+        """
+        Transpose this CoordinateMatrix.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(1, 0, 2),
+        ...                           MatrixEntry(2, 1, 3.7)])
+        >>> mat = CoordinateMatrix(entries)
+        >>> mat_transposed = mat.transpose()
+
+        >>> print(mat_transposed.numRows())
+        2
+
+        >>> print(mat_transposed.numCols())
+        3
+        """
+        java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+        return CoordinateMatrix(java_transposed_matrix)
+
     def toRowMatrix(self):
         """
         Convert this matrix to a RowMatrix.
@@ -789,6 +976,33 @@ class BlockMatrix(DistributedMatrix):
         """
         return self._java_matrix_wrapper.call("numCols")
 
+    @since('2.0.0')
+    def cache(self):
+        """
+        Caches the underlying RDD.
+        """
+        self._java_matrix_wrapper.call("cache")
+        return self
+
+    @since('2.0.0')
+    def persist(self, storageLevel):
+        """
+        Persists the underlying RDD with the specified storage level.
+        """
+        if not isinstance(storageLevel, StorageLevel):
+            raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel))
+        javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel)
+        self._java_matrix_wrapper.call("persist", javaStorageLevel)
+        return self
+
+    @since('2.0.0')
+    def validate(self):
+        """
+        Validates the block matrix info against the matrix data (`blocks`)
+        and throws an exception if any error is found.
+        """
+        self._java_matrix_wrapper.call("validate")
+
     def add(self, other):
         """
         Adds two block matrices together. The matrices must have the
@@ -822,6 +1036,41 @@ class BlockMatrix(DistributedMatrix):
         java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
         return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
 
+    @since('2.0.0')
+    def subtract(self, other):
+        """
+        Subtracts the given block matrix `other` from this block matrix:
+        `this - other`. The matrices must have the same size and
+        matching `rowsPerBlock` and `colsPerBlock` values.  If one of
+        the sub matrix blocks that are being subtracted is a
+        SparseMatrix, the resulting sub matrix block will also be a
+        SparseMatrix, even if it is being subtracted from a DenseMatrix.
+        If two dense sub matrix blocks are subtracted, the output block
+        will also be a DenseMatrix.
+
+        >>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2])
+        >>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
+        >>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3])
+        >>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
+        >>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)])
+        >>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
+        >>> mat1 = BlockMatrix(blocks1, 3, 2)
+        >>> mat2 = BlockMatrix(blocks2, 3, 2)
+        >>> mat3 = BlockMatrix(blocks3, 3, 2)
+
+        >>> mat1.subtract(mat2).toLocalMatrix()
+        DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0)
+
+        >>> mat2.subtract(mat3).toLocalMatrix()
+        DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0)
+        """
+        if not isinstance(other, BlockMatrix):
+            raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
+
+        other_java_block_matrix = other._java_matrix_wrapper._java_model
+        java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix)
+        return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+
     def multiply(self, other):
         """
         Left multiplies this BlockMatrix by `other`, another
@@ -857,6 +1106,23 @@ class BlockMatrix(DistributedMatrix):
         java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
         return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
 
+    @since('2.0.0')
+    def transpose(self):
+        """
+        Transpose this BlockMatrix. Returns a new BlockMatrix
+        instance sharing the same underlying data. Is a lazy operation.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+
+        >>> mat_transposed = mat.transpose()
+        >>> mat_transposed.toLocalMatrix()
+        DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0)
+        """
+        java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+        return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock)
+
     def toLocalMatrix(self):
         """
         Collect the distributed matrix on the driver as a DenseMatrix.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org