You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/04/10 08:10:16 UTC

spark git commit: [SPARK-6577] [MLlib] [PySpark] SparseMatrix should be supported in PySpark

Repository: spark
Updated Branches:
  refs/heads/master b5c51c8df -> e2360810f


[SPARK-6577] [MLlib] [PySpark] SparseMatrix should be supported in PySpark

Supporting of SparseMatrix in PySpark.

Author: MechCoder <ma...@gmail.com>

Closes #5355 from MechCoder/spark-6577 and squashes the following commits:

7492190 [MechCoder] More readable code for densifying
ea2c54b [MechCoder] Check bounds for indexing
454ef2c [MechCoder] Made the following changes 1. Used convert_to_array for array conversion. 2. Used F order for toArray 3. Minor improvements in speed.
db76caf [MechCoder] Add support for CSR matrix
29653e7 [MechCoder] Renamed indices to rowIndices and indptr to colPtrs
b6384fe [MechCoder] [SPARK-6577] SparseMatrix should be supported in PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2360810
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2360810
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2360810

Branch: refs/heads/master
Commit: e2360810f50de77f79d372cc9b46db117d451cfc
Parents: b5c51c8
Author: MechCoder <ma...@gmail.com>
Authored: Thu Apr 9 23:10:13 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Apr 9 23:10:13 2015 -0700

----------------------------------------------------------------------
 python/pyspark/mllib/linalg.py | 110 ++++++++++++++++++++++++++++++++++--
 python/pyspark/mllib/tests.py  |  52 ++++++++++++++++-
 2 files changed, 154 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2360810/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 51c1490..a80320c 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -640,6 +640,15 @@ class Matrix(object):
         """
         raise NotImplementedError
 
+    @staticmethod
+    def _convert_to_array(array_like, dtype):
+        """
+        Convert Matrix attributes which are array-like or buffer to array.
+        """
+        if isinstance(array_like, basestring):
+            return np.frombuffer(array_like, dtype=dtype)
+        return np.asarray(array_like, dtype=dtype)
+
 
 class DenseMatrix(Matrix):
     """
@@ -647,13 +656,8 @@ class DenseMatrix(Matrix):
     """
     def __init__(self, numRows, numCols, values):
         Matrix.__init__(self, numRows, numCols)
-        if isinstance(values, basestring):
-            values = np.frombuffer(values, dtype=np.float64)
-        elif not isinstance(values, np.ndarray):
-            values = np.array(values, dtype=np.float64)
+        values = self._convert_to_array(values, np.float64)
         assert len(values) == numRows * numCols
-        if values.dtype != np.float64:
-            values.astype(np.float64)
         self.values = values
 
     def __reduce__(self):
@@ -670,6 +674,17 @@ class DenseMatrix(Matrix):
         """
         return self.values.reshape((self.numRows, self.numCols), order='F')
 
+    def toSparse(self):
+        """Convert to SparseMatrix"""
+        indices = np.nonzero(self.values)[0]
+        colCounts = np.bincount(indices / self.numRows)
+        colPtrs = np.cumsum(np.hstack(
+            (0, colCounts, np.zeros(self.numCols - colCounts.size))))
+        values = self.values[indices]
+        rowIndices = indices % self.numRows
+
+        return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, values)
+
     def __getitem__(self, indices):
         i, j = indices
         if i < 0 or i >= self.numRows:
@@ -687,6 +702,82 @@ class DenseMatrix(Matrix):
                 all(self.values == other.values))
 
 
+class SparseMatrix(Matrix):
+    """Sparse Matrix stored in CSC format."""
+    def __init__(self, numRows, numCols, colPtrs, rowIndices, values,
+                 isTransposed=False):
+        Matrix.__init__(self, numRows, numCols)
+        self.isTransposed = isTransposed
+        self.colPtrs = self._convert_to_array(colPtrs, np.int32)
+        self.rowIndices = self._convert_to_array(rowIndices, np.int32)
+        self.values = self._convert_to_array(values, np.float64)
+
+        if self.isTransposed:
+            if self.colPtrs.size != numRows + 1:
+                raise ValueError("Expected colPtrs of size %d, got %d."
+                                 % (numRows + 1, self.colPtrs.size))
+        else:
+            if self.colPtrs.size != numCols + 1:
+                raise ValueError("Expected colPtrs of size %d, got %d."
+                                 % (numCols + 1, self.colPtrs.size))
+        if self.rowIndices.size != self.values.size:
+            raise ValueError("Expected rowIndices of length %d, got %d."
+                             % (self.rowIndices.size, self.values.size))
+
+    def __reduce__(self):
+        return SparseMatrix, (
+            self.numRows, self.numCols, self.colPtrs.tostring(),
+            self.rowIndices.tostring(), self.values.tostring(),
+            self.isTransposed)
+
+    def __getitem__(self, indices):
+        i, j = indices
+        if i < 0 or i >= self.numRows:
+            raise ValueError("Row index %d is out of range [0, %d)"
+                             % (i, self.numRows))
+        if j < 0 or j >= self.numCols:
+            raise ValueError("Column index %d is out of range [0, %d)"
+                             % (j, self.numCols))
+
+        # If a CSR matrix is given, then the row index should be searched
+        # for in ColPtrs, and the column index should be searched for in the
+        # corresponding slice obtained from rowIndices.
+        if self.isTransposed:
+            j, i = i, j
+
+        colStart = self.colPtrs[j]
+        colEnd = self.colPtrs[j + 1]
+        nz = self.rowIndices[colStart: colEnd]
+        ind = np.searchsorted(nz, i) + colStart
+        if ind < colEnd and self.rowIndices[ind] == i:
+            return self.values[ind]
+        else:
+            return 0.0
+
+    def toArray(self):
+        """
+        Return an numpy.ndarray
+        """
+        A = np.zeros((self.numRows, self.numCols), dtype=np.float64, order='F')
+        for k in xrange(self.colPtrs.size - 1):
+            startptr = self.colPtrs[k]
+            endptr = self.colPtrs[k + 1]
+            if self.isTransposed:
+                A[k, self.rowIndices[startptr:endptr]] = self.values[startptr:endptr]
+            else:
+                A[self.rowIndices[startptr:endptr], k] = self.values[startptr:endptr]
+        return A
+
+    def toDense(self):
+        densevals = np.reshape(
+            self.toArray(), (self.numRows * self.numCols), order='F')
+        return DenseMatrix(self.numRows, self.numCols, densevals)
+
+    # TODO: More efficient implementation:
+    def __eq__(self, other):
+        return np.all(self.toArray == other.toArray)
+
+
 class Matrices(object):
     @staticmethod
     def dense(numRows, numCols, values):
@@ -695,6 +786,13 @@ class Matrices(object):
         """
         return DenseMatrix(numRows, numCols, values)
 
+    @staticmethod
+    def sparse(numRows, numCols, colPtrs, rowIndices, values):
+        """
+        Create a SparseMatrix
+        """
+        return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
+
 
 def _test():
     import doctest

http://git-wip-us.apache.org/repos/asf/spark/blob/e2360810/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 61ef398..3b40158 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -24,7 +24,7 @@ import sys
 import tempfile
 import array as pyarray
 
-from numpy import array, array_equal
+from numpy import array, array_equal, zeros
 from py4j.protocol import Py4JJavaError
 
 if sys.version_info[:2] <= (2, 6):
@@ -38,7 +38,7 @@ else:
 
 from pyspark.mllib.common import _to_java_object_rdd
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
-    DenseMatrix, Vectors, Matrices
+    DenseMatrix, SparseMatrix, Vectors, Matrices
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.random import RandomRDDs
 from pyspark.mllib.stat import Statistics
@@ -144,6 +144,54 @@ class VectorTests(PySparkTestCase):
             for j in range(2):
                 self.assertEquals(mat[i, j], expected[i][j])
 
+    def test_sparse_matrix(self):
+        # Test sparse matrix creation.
+        sm1 = SparseMatrix(
+            3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
+        self.assertEquals(sm1.numRows, 3)
+        self.assertEquals(sm1.numCols, 4)
+        self.assertEquals(sm1.colPtrs.tolist(), [0, 2, 2, 4, 4])
+        self.assertEquals(sm1.rowIndices.tolist(), [1, 2, 1, 2])
+        self.assertEquals(sm1.values.tolist(), [1.0, 2.0, 4.0, 5.0])
+
+        # Test indexing
+        expected = [
+            [0, 0, 0, 0],
+            [1, 0, 4, 0],
+            [2, 0, 5, 0]]
+
+        for i in range(3):
+            for j in range(4):
+                self.assertEquals(expected[i][j], sm1[i, j])
+        self.assertTrue(array_equal(sm1.toArray(), expected))
+
+        # Test conversion to dense and sparse.
+        smnew = sm1.toDense().toSparse()
+        self.assertEquals(sm1.numRows, smnew.numRows)
+        self.assertEquals(sm1.numCols, smnew.numCols)
+        self.assertTrue(array_equal(sm1.colPtrs, smnew.colPtrs))
+        self.assertTrue(array_equal(sm1.rowIndices, smnew.rowIndices))
+        self.assertTrue(array_equal(sm1.values, smnew.values))
+
+        sm1t = SparseMatrix(
+            3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0],
+            isTransposed=True)
+        self.assertEquals(sm1t.numRows, 3)
+        self.assertEquals(sm1t.numCols, 4)
+        self.assertEquals(sm1t.colPtrs.tolist(), [0, 2, 3, 5])
+        self.assertEquals(sm1t.rowIndices.tolist(), [0, 1, 2, 0, 2])
+        self.assertEquals(sm1t.values.tolist(), [3.0, 2.0, 4.0, 9.0, 8.0])
+
+        expected = [
+            [3, 2, 0, 0],
+            [0, 0, 4, 0],
+            [9, 0, 8, 0]]
+
+        for i in range(3):
+            for j in range(4):
+                self.assertEquals(expected[i][j], sm1t[i, j])
+        self.assertTrue(array_equal(sm1t.toArray(), expected))
+
 
 class ListTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org