You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/01/22 23:02:14 UTC

[31/50] git commit: use SparseMatrix everywhere

use SparseMatrix everywhere


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/06c0f762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/06c0f762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/06c0f762

Branch: refs/heads/master
Commit: 06c0f7628a213a08ef5adeab903160b806680acf
Parents: cdff9fc
Author: Reza Zadeh <ri...@gmail.com>
Authored: Sat Jan 4 14:28:07 2014 -0800
Committer: Reza Zadeh <ri...@gmail.com>
Committed: Sat Jan 4 14:28:07 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/examples/SparkSVD.scala    |  9 +--
 .../org/apache/spark/mllib/linalg/SVD.scala     | 67 +++++++-------------
 .../spark/mllib/linalg/SVDecomposedMatrix.scala |  8 +--
 .../spark/mllib/linalg/SparseMatrix.scala       | 30 +++++++++
 .../apache/spark/mllib/linalg/SVDSuite.scala    | 50 +++++++++------
 5 files changed, 89 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/06c0f762/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala
index 5590ee7..4b9e674 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.examples
 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.linalg.SVD
 import org.apache.spark.mllib.linalg.MatrixEntry
+import org.apache.spark.mllib.linalg.SparseMatrix
 
 /**
  * Compute SVD of an example matrix
@@ -48,10 +49,10 @@ object SparkSVD {
     val n = 4
 
     // recover largest singular vector
-    val decomposed = SVD.sparseSVD(data, m, n, 1)
-    val u = decomposed.U
-    val s = decomposed.S
-    val v = decomposed.V
+    val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
+    val u = decomposed.U.data
+    val s = decomposed.S.data
+    val v = decomposed.V.data
 
     println("singular values = " + s.toArray.mkString)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/06c0f762/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index 31990b0..a8efdc7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
 
 /**
  * Class used to obtain singular value decompositions
- * @param data Matrix in sparse matrix format
- * @param m number of rows
- * @param n number of columns
  */
-class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
+class SVD {
   private var k: Int = 1
 
   /**
@@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
     this
   }
 
-  /**
-   * Set matrix to be used for SVD
-   */
-  def setDatadata(data: RDD[MatrixEntry]): this.type = {
-    this.data = data
-    this
-  }
-
-  /**
-   * Set dimensions of matrix: rows
-   */
-  def setNumRows(m: Int): this.type = {
-    this.m = m
-    this
-  }
-
-  /**
-   * Set dimensions of matrix: columns
-   */
-  def setNumCols(n: Int): this.type = {
-    this.n = n
-    this
-  }
-
    /**
    * Compute SVD using the current set parameters
    */
-  def computeSVD() : SVDecomposedMatrix = {
-    SVD.sparseSVD(data, m, n, k)
+  def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = {
+    SVD.sparseSVD(matrix, k)
   }
 }
 
@@ -103,19 +76,19 @@ object SVD {
  * All input and output is expected in sparse matrix format, 1-indexed
  * as tuples of the form ((i,j),value) all in RDDs
  *
- * @param data RDD Matrix in sparse 1-index format ((int, int), value)
- * @param m number of rows
- * @param n number of columns
+ * @param matrix sparse matrix to factorize
  * @param k Recover k singular values and vectors
  * @return Three sparse matrices: U, S, V such that A = USV^T
  */
   def sparseSVD(
-      data: RDD[MatrixEntry],
-      m: Int,
-      n: Int,
+      matrix: SparseMatrix,
       k: Int)
     : SVDecomposedMatrix =
   {
+    val data = matrix.data
+    val m = matrix.m
+    val n = matrix.n
+
     if (m < n || m <= 0 || n <= 0) {
       throw new IllegalArgumentException("Expecting a tall and skinny matrix")
     }
@@ -153,13 +126,16 @@ object SVD {
     val sc = data.sparkContext
 
     // prepare V for returning
-    val retV = sc.makeRDD(
+    val retVdata = sc.makeRDD(
             Array.tabulate(V.rows, sigma.length){ (i,j) =>
                     MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten)
-
-    val retS = sc.makeRDD(Array.tabulate(sigma.length){
+    val retV = SparseMatrix(retVdata, V.rows, sigma.length)
+     
+    val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
       x => MatrixEntry(x + 1, x + 1, sigma(x))})
 
+    val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
+
     // Compute U as U = A V S^-1
     // turn V S^-1 into an RDD as a sparse matrix
     val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
@@ -168,10 +144,11 @@ object SVD {
     // Multiply A by VS^-1
     val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
     val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
-    val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+    val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
         => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
           .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
-     
+    val retU = SparseMatrix(retUdata, m, sigma.length) 
+   
     SVDecomposedMatrix(retU, retS, retV)  
   }
 
@@ -195,10 +172,10 @@ object SVD {
       MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
     }
 
-    val decomposed = SVD.sparseSVD(data, m, n, k)
-    val u = decomposed.U
-    val s = decomposed.S
-    val v = decomposed.V
+    val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+    val u = decomposed.U.data
+    val s = decomposed.S.data
+    val v = decomposed.V.data
     
     println("Computed " + s.toArray.length + " singular values and vectors")
     u.saveAsTextFile(output_u)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/06c0f762/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
index e0bcdab..6220035 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.mllib.linalg
 
-import org.apache.spark.rdd.RDD
-
 /**
  * Class that represents the SV decomposition of a matrix
  *
@@ -26,6 +24,6 @@ import org.apache.spark.rdd.RDD
  * @param S such that A = USV^T
  * @param V such that A = USV^T
  */
-case class SVDecomposedMatrix(val U: RDD[MatrixEntry],
-                              val S: RDD[MatrixEntry],
-                              val V: RDD[MatrixEntry])
+case class SVDecomposedMatrix(val U: SparseMatrix,
+                              val S: SparseMatrix,
+                              val V: SparseMatrix)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/06c0f762/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
new file mode 100644
index 0000000..cbd1a2a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Class that represents a sparse matrix
+ *
+ * @param data RDD of nonzero entries
+ * @param m number of rows
+ * @param n numner of columns
+ */
+case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/06c0f762/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
index 4126e81..f239e85 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -45,9 +45,12 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
   val EPSILON = 1e-4
 
   // Return jblas matrix from sparse matrix RDD
-  def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = {
+  def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = {
+    val data = matrix.data
+    val m = matrix.m
+    val n = matrix.n
     val ret = DoubleMatrix.zeros(m, n)
-    matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
+    matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
     ret
   }
 
@@ -67,24 +70,26 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
     val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
       MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )
 
-    val decomposed = SVD.sparseSVD(data, m, n, n)
+    val a = SparseMatrix(data, m, n)
+
+    val decomposed = SVD.sparseSVD(a, n)
     val u = decomposed.U
     val v = decomposed.V
-    val s = decomposed.S    
+    val s = decomposed.S
 
-    val densea = getDenseMatrix(data, m, n)
+    val densea = getDenseMatrix(a)
     val svd = Singular.sparseSVD(densea)
 
-    val retu = getDenseMatrix(u, m, n)
-    val rets = getDenseMatrix(s, n, n)
-    val retv = getDenseMatrix(v, n, n)
+    val retu = getDenseMatrix(u)
+    val rets = getDenseMatrix(s)
+    val retv = getDenseMatrix(v)
   
     // check individual decomposition  
     assertMatrixEquals(retu, svd(0))
     assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
     assertMatrixEquals(retv, svd(2))
 
-     // check multiplication guarantee
+    // check multiplication guarantee
     assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)  
   }
 
@@ -95,20 +100,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
       MatrixEntry(a + 1, b + 1, 1.0) }.flatten )
     val k = 1
 
-    val decomposed = SVD.sparseSVD(data, m, n, k)
+    val a = SparseMatrix(data, m, n)
+
+    val decomposed = SVD.sparseSVD(a, k)
     val u = decomposed.U
     val s = decomposed.S
     val v = decomposed.V
-    val retrank = s.toArray.length
+    val retrank = s.data.toArray.length
 
     assert(retrank == 1, "rank returned not one")
 
-    val densea = getDenseMatrix(data, m, n)
+    val densea = getDenseMatrix(a)
     val svd = Singular.sparseSVD(densea)
 
-    val retu = getDenseMatrix(u, m, retrank)
-    val rets = getDenseMatrix(s, retrank, retrank)
-    val retv = getDenseMatrix(v, n, retrank)
+    val retu = getDenseMatrix(u)
+    val rets = getDenseMatrix(s)
+    val retv = getDenseMatrix(v)
 
     // check individual decomposition  
     assertMatrixEquals(retu, svd(0).getColumn(0))
@@ -124,21 +131,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
     val n = 3
     val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
       MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
+    val a = SparseMatrix(data, m, n)
     
     val k = 1 // only one svalue above this
 
-    val decomposed = SVD.sparseSVD(data, m, n, k)
+    val decomposed = SVD.sparseSVD(a, k)
     val u = decomposed.U
     val s = decomposed.S
     val v = decomposed.V
-    val retrank = s.toArray.length
+    val retrank = s.data.toArray.length
 
-    val densea = getDenseMatrix(data, m, n)
+    val densea = getDenseMatrix(a)
     val svd = Singular.sparseSVD(densea)
 
-    val retu = getDenseMatrix(u, m, retrank)
-    val rets = getDenseMatrix(s, retrank, retrank)
-    val retv = getDenseMatrix(v, n, retrank)
+    val retu = getDenseMatrix(u)
+    val rets = getDenseMatrix(s)
+    val retv = getDenseMatrix(v)
 
     assert(retrank == 1, "rank returned not one")