You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/01/22 23:02:05 UTC

[22/50] git commit: rename sparsesvd.scala

rename sparsesvd.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6bcdb762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6bcdb762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6bcdb762

Branch: refs/heads/master
Commit: 6bcdb762a107c82ef095553ab31284623475cb2c
Parents: b059a2a
Author: Reza Zadeh <ri...@gmail.com>
Authored: Fri Jan 3 21:55:38 2014 -0800
Committer: Reza Zadeh <ri...@gmail.com>
Committed: Fri Jan 3 21:55:38 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/mllib/linalg/SVD.scala     | 153 +++++++++++++++++++
 .../apache/spark/mllib/linalg/sparsesvd.scala   | 153 -------------------
 2 files changed, 153 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6bcdb762/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
new file mode 100644
index 0000000..2198e6a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+/**
+ * Top-level methods for calling Singular Value Decomposition
+ * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)]
+ */
+object SVD {
+/**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ * 
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ * 
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V. 
+ * Then we compute U via easy matrix multiplication
+ * as U =  A * V * S^-1
+ * 
+ * Only singular vectors associated with singular values
+ * greater or equal to MIN_SVALUE are recovered. If there are k
+ * such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * All input and output is expected in sparse matrix format, 1-indexed
+ * as tuples of the form ((i,j),value) all in RDDs
+ *
+ * @param data RDD Matrix in sparse 1-index format ((int, int), value)
+ * @param m number of rows
+ * @param n number of columns
+ * @param min_svalue Recover singular values greater or equal to min_svalue
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+  def sparseSVD(
+      data: RDD[((Int, Int), Double)],
+      m: Int,
+      n: Int,
+      min_svalue: Double)
+    : (  RDD[((Int, Int), Double)],
+         RDD[((Int, Int), Double)],
+         RDD[((Int, Int), Double)]) =
+  {
+    if (m < n || m <= 0 || n <= 0) {
+      throw new IllegalArgumentException("Expecting a tall and skinny matrix")
+    }
+
+    if (min_svalue < 1.0e-8) {
+      throw new IllegalArgumentException("Minimum singular value requested is too small")
+    }
+
+    // Compute A^T A, assuming rows are sparse enough to fit in memory
+    val rows = data.map(entry =>
+            (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
+    val emits = rows.flatMap{ case (rowind, cols)  =>
+      cols.flatMap{ case (colind1, mval1) =>
+                    cols.map{ case (colind2, mval2) =>
+                            ((colind1, colind2), mval1*mval2) } }
+    }.reduceByKey(_+_)
+
+    // Construct jblas A^T A locally
+    val ata = DoubleMatrix.zeros(n, n)
+    for(entry <- emits.toArray) {
+      ata.put(entry._1._1-1, entry._1._2-1, entry._2)
+    }
+
+    // Since A^T A is small, we can compute its SVD directly
+    val svd = Singular.sparseSVD(ata)
+    val V = svd(0)
+    val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
+
+    // threshold s values
+    if(sigma.isEmpty) {
+      throw new Exception("All singular values are smaller than min_svalue: " + min_svalue)
+    }
+
+    val sc = data.sparkContext
+
+    // prepare V for returning
+    val retV = sc.makeRDD(
+            Array.tabulate(V.rows, sigma.length){ (i,j) =>
+                    ((i+1, j+1), V.get(i,j)) }.flatten)
+
+    val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
+
+    // Compute U as U = A V S^-1
+    // turn V S^-1 into an RDD as a sparse matrix and cache it
+    val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+                { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j))  }.flatten).cache()
+
+    // Multiply A by VS^-1
+    val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
+    val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+    val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+        => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+     
+    (retU, retS, retV)  
+  }
+
+
+  def main(args: Array[String]) {
+    if (args.length < 8) {
+      println("Usage: SVD <master> <matrix_file> <m> <n> <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>")
+      System.exit(1)
+    }
+    val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = 
+      (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
+    
+    val sc = new SparkContext(master, "SVD")
+    
+    val rawdata = sc.textFile(inputFile)
+    val data = rawdata.map { line =>
+      val parts = line.split(',')
+      ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
+    }
+
+    val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+    println("Computed " + s.toArray.length + " singular values and vectors")
+    u.saveAsTextFile(output_u)
+    s.saveAsTextFile(output_s)
+    v.saveAsTextFile(output_v)
+    System.exit(0)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6bcdb762/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
deleted file mode 100644
index 2198e6a..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-/**
- * Top-level methods for calling Singular Value Decomposition
- * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)]
- */
-object SVD {
-/**
- * Singular Value Decomposition for Tall and Skinny matrices.
- * Given an m x n matrix A, this will compute matrices U, S, V such that
- * A = U * S * V'
- * 
- * There is no restriction on m, but we require n^2 doubles to fit in memory.
- * Further, n should be less than m.
- * 
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small),
- * from which we recover S and V. 
- * Then we compute U via easy matrix multiplication
- * as U =  A * V * S^-1
- * 
- * Only singular vectors associated with singular values
- * greater or equal to MIN_SVALUE are recovered. If there are k
- * such values, then the dimensions of the return will be:
- *
- * S is k x k and diagonal, holding the singular values on diagonal
- * U is m x k and satisfies U'U = eye(k)
- * V is n x k and satisfies V'V = eye(k)
- *
- * All input and output is expected in sparse matrix format, 1-indexed
- * as tuples of the form ((i,j),value) all in RDDs
- *
- * @param data RDD Matrix in sparse 1-index format ((int, int), value)
- * @param m number of rows
- * @param n number of columns
- * @param min_svalue Recover singular values greater or equal to min_svalue
- * @return Three sparse matrices: U, S, V such that A = USV^T
- */
-  def sparseSVD(
-      data: RDD[((Int, Int), Double)],
-      m: Int,
-      n: Int,
-      min_svalue: Double)
-    : (  RDD[((Int, Int), Double)],
-         RDD[((Int, Int), Double)],
-         RDD[((Int, Int), Double)]) =
-  {
-    if (m < n || m <= 0 || n <= 0) {
-      throw new IllegalArgumentException("Expecting a tall and skinny matrix")
-    }
-
-    if (min_svalue < 1.0e-8) {
-      throw new IllegalArgumentException("Minimum singular value requested is too small")
-    }
-
-    // Compute A^T A, assuming rows are sparse enough to fit in memory
-    val rows = data.map(entry =>
-            (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
-    val emits = rows.flatMap{ case (rowind, cols)  =>
-      cols.flatMap{ case (colind1, mval1) =>
-                    cols.map{ case (colind2, mval2) =>
-                            ((colind1, colind2), mval1*mval2) } }
-    }.reduceByKey(_+_)
-
-    // Construct jblas A^T A locally
-    val ata = DoubleMatrix.zeros(n, n)
-    for(entry <- emits.toArray) {
-      ata.put(entry._1._1-1, entry._1._2-1, entry._2)
-    }
-
-    // Since A^T A is small, we can compute its SVD directly
-    val svd = Singular.sparseSVD(ata)
-    val V = svd(0)
-    val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
-
-    // threshold s values
-    if(sigma.isEmpty) {
-      throw new Exception("All singular values are smaller than min_svalue: " + min_svalue)
-    }
-
-    val sc = data.sparkContext
-
-    // prepare V for returning
-    val retV = sc.makeRDD(
-            Array.tabulate(V.rows, sigma.length){ (i,j) =>
-                    ((i+1, j+1), V.get(i,j)) }.flatten)
-
-    val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
-
-    // Compute U as U = A V S^-1
-    // turn V S^-1 into an RDD as a sparse matrix and cache it
-    val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
-                { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j))  }.flatten).cache()
-
-    // Multiply A by VS^-1
-    val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
-    val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
-    val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
-        => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
-     
-    (retU, retS, retV)  
-  }
-
-
-  def main(args: Array[String]) {
-    if (args.length < 8) {
-      println("Usage: SVD <master> <matrix_file> <m> <n> <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>")
-      System.exit(1)
-    }
-    val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = 
-      (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
-    
-    val sc = new SparkContext(master, "SVD")
-    
-    val rawdata = sc.textFile(inputFile)
-    val data = rawdata.map { line =>
-      val parts = line.split(',')
-      ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
-    }
-
-    val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
-    println("Computed " + s.toArray.length + " singular values and vectors")
-    u.saveAsTextFile(output_u)
-    s.saveAsTextFile(output_s)
-    v.saveAsTextFile(output_v)
-    System.exit(0)
-  }
-}
-
-