You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/07/09 21:15:12 UTC

git commit: SPARK-1782: svd for sparse matrix using ARPACK

Repository: spark
Updated Branches:
  refs/heads/master d35e3db23 -> 1f33e1f20


SPARK-1782: svd for sparse matrix using ARPACK

copy ARPACK dsaupd/dseupd code from latest breeze
change RowMatrix to use sparse SVD
change tests for sparse SVD

All tests passed. I will run it against some large matrices.

Author: Li Pu <lp...@twitter.com>
Author: Xiangrui Meng <me...@databricks.com>
Author: Li Pu <li...@outlook.com>

Closes #964 from vrilleup/master and squashes the following commits:

7312ec1 [Li Pu] very minor comment fix
4c618e9 [Li Pu] Merge pull request #1 from mengxr/vrilleup-master
a461082 [Xiangrui Meng] make superscript show up correctly in doc
861ec48 [Xiangrui Meng] simplify axpy
62969fa [Xiangrui Meng] use BDV directly in symmetricEigs change the computation mode to local-svd, local-eigs, and dist-eigs update tests and docs
c273771 [Li Pu] automatically determine SVD compute mode and parameters
7148426 [Li Pu] improve RowMatrix multiply
5543cce [Li Pu] improve svd api
819824b [Li Pu] add flag for dense svd or sparse svd
eb15100 [Li Pu] fix binary compatibility
4c7aec3 [Li Pu] improve comments
e7850ed [Li Pu] use aggregate and axpy
827411b [Li Pu] fix EOF new line
9c80515 [Li Pu] use non-sparse implementation when k = n
fe983b0 [Li Pu] improve scala style
96d2ecb [Li Pu] improve eigenvalue sorting
e1db950 [Li Pu] SPARK-1782: svd for sparse matrix using ARPACK


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f33e1f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f33e1f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f33e1f2

Branch: refs/heads/master
Commit: 1f33e1f2013c508aa86511750f7bd8437154e51a
Parents: d35e3db
Author: Li Pu <lp...@twitter.com>
Authored: Wed Jul 9 12:15:08 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jul 9 12:15:08 2014 -0700

----------------------------------------------------------------------
 .../mllib/linalg/EigenValueDecomposition.scala  | 157 ++++++++++++++++
 .../mllib/linalg/distributed/RowMatrix.scala    | 183 +++++++++++++++----
 .../linalg/distributed/RowMatrixSuite.scala     |  59 +++---
 3 files changed, 339 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f33e1f2/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
new file mode 100644
index 0000000..3515461
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
+import com.github.fommil.netlib.ARPACK
+import org.netlib.util.{intW, doubleW}
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Compute eigen-decomposition.
+ */
+@Experimental
+private[mllib] object EigenValueDecomposition {
+  /**
+   * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK.
+   * The caller needs to ensure that the input matrix is real symmetric. This function requires
+   * memory for `n*(4*k+4)` doubles.
+   *
+   * @param mul a function that multiplies the symmetric matrix with a DenseVector.
+   * @param n dimension of the square matrix (maximum Int.MaxValue).
+   * @param k number of leading eigenvalues required, 0 < k < n.
+   * @param tol tolerance of the eigs computation.
+   * @param maxIterations the maximum number of Arnoldi update iterations.
+   * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
+   *         (columns of the matrix).
+   * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not
+   *       satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6
+   *       for more details). The maximum number of Arnoldi update iterations is set to 300 in this
+   *       function.
+   */
+  private[mllib] def symmetricEigs(
+      mul: BDV[Double] => BDV[Double],
+      n: Int,
+      k: Int,
+      tol: Double,
+      maxIterations: Int): (BDV[Double], BDM[Double]) = {
+    // TODO: remove this function and use eigs in breeze when switching breeze version
+    require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n")
+
+    val arpack = ARPACK.getInstance()
+
+    // tolerance used in stopping criterion
+    val tolW = new doubleW(tol)
+    // number of desired eigenvalues, 0 < nev < n
+    val nev = new intW(k)
+    // nev Lanczos vectors are generated in the first iteration
+    // ncv-nev Lanczos vectors are generated in each subsequent iteration
+    // ncv must be smaller than n
+    val ncv = math.min(2 * k, n)
+
+    // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem
+    val bmat = "I"
+    // "LM" : compute the NEV largest (in magnitude) eigenvalues
+    val which = "LM"
+
+    var iparam = new Array[Int](11)
+    // use exact shift in each iteration
+    iparam(0) = 1
+    // maximum number of Arnoldi update iterations, or the actual number of iterations on output
+    iparam(2) = maxIterations
+    // Mode 1: A*x = lambda*x, A symmetric
+    iparam(6) = 1
+
+    var ido = new intW(0)
+    var info = new intW(0)
+    var resid = new Array[Double](n)
+    var v = new Array[Double](n * ncv)
+    var workd = new Array[Double](n * 3)
+    var workl = new Array[Double](ncv * (ncv + 8))
+    var ipntr = new Array[Int](11)
+
+    // call ARPACK's reverse communication, first iteration with ido = 0
+    arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
+      workl, workl.length, info)
+
+    val w = BDV(workd)
+
+    // ido = 99 : done flag in reverse communication
+    while (ido.`val` != 99) {
+      if (ido.`val` != -1 && ido.`val` != 1) {
+        throw new IllegalStateException("ARPACK returns ido = " + ido.`val` +
+            " This flag is not compatible with Mode 1: A*x = lambda*x, A symmetric.")
+      }
+      // multiply working vector with the matrix
+      val inputOffset = ipntr(0) - 1
+      val outputOffset = ipntr(1) - 1
+      val x = w.slice(inputOffset, inputOffset + n)
+      val y = w.slice(outputOffset, outputOffset + n)
+      y := mul(x)
+      // call ARPACK's reverse communication
+      arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
+        workd, workl, workl.length, info)
+    }
+
+    if (info.`val` != 0) {
+      info.`val` match {
+        case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+            " Maximum number of iterations taken. (Refer ARPACK user guide for details)")
+        case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+            " No shifts could be applied. Try to increase NCV. " +
+            "(Refer ARPACK user guide for details)")
+        case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+            " Please refer ARPACK user guide for error message.")
+      }
+    }
+
+    val d = new Array[Double](nev.`val`)
+    val select = new Array[Boolean](ncv)
+    // copy the Ritz vectors
+    val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n)
+
+    // call ARPACK's post-processing for eigenvectors
+    arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n,
+      iparam, ipntr, workd, workl, workl.length, info)
+
+    // number of computed eigenvalues, might be smaller than k
+    val computed = iparam(4)
+
+    val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r =>
+      (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n))
+    }
+
+    // sort the eigen-pairs in descending order
+    val sortedEigenPairs = eigenPairs.sortBy(- _._1)
+
+    // copy eigenvectors in descending order of eigenvalues
+    val sortedU = BDM.zeros[Double](n, computed)
+    sortedEigenPairs.zipWithIndex.foreach { r =>
+      val b = r._2 * n
+      var i = 0
+      while (i < n) {
+        sortedU.data(b + i) = r._1._2(i)
+        i += 1
+      }
+    }
+
+    (BDV[Double](sortedEigenPairs.map(_._1)), sortedU)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1f33e1f2/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 695e03b..99cb651 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.mllib.linalg.distributed
 
-import java.util
+import java.util.Arrays
 
-import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
+import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
 import breeze.numerics.{sqrt => brzSqrt}
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
 
@@ -34,7 +35,7 @@ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
  * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
  * together with add() and merge() function.
  * A numerically stable algorithm is implemented to compute sample mean and variance:
-  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
  * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
  * to have time complexity O(nnz) instead of O(n) for each column.
  */
@@ -201,6 +202,26 @@ class RowMatrix(
   }
 
   /**
+   * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`.
+   *
+   * @param v a dense vector whose length must match the number of columns of this matrix
+   * @return a dense vector representing the product
+   */
+  private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
+    val n = numCols().toInt
+    val vbr = rows.context.broadcast(v)
+    rows.aggregate(BDV.zeros[Double](n))(
+      seqOp = (U, r) => {
+        val rBrz = r.toBreeze
+        val a = rBrz.dot(vbr.value)
+        brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]])
+        U
+      },
+      combOp = (U1, U2) => U1 += U2
+    )
+  }
+
+  /**
    * Computes the Gramian matrix `A^T A`.
    */
   def computeGramianMatrix(): Matrix = {
@@ -220,50 +241,135 @@ class RowMatrix(
   }
 
   /**
-   * Computes the singular value decomposition of this matrix.
-   * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+   * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
+   * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
+   * singular values, U and V contain the corresponding singular vectors.
    *
-   * There is no restriction on m, but we require `n^2` doubles to fit in memory.
-   * Further, n should be less than m.
-
-   * The decomposition is computed by first computing A'A = V S^2 V',
-   * computing svd locally on that (since n x n is small), from which we recover S and V.
-   * Then we compute U via easy matrix multiplication as U =  A * (V * S^-1).
-   * Note that this approach requires `O(n^3)` time on the master node.
+   * At most k largest non-zero singular values and associated vectors are returned. If there are k
+   * such values, then the dimensions of the return will be:
+   *  - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
+   *  - s is a Vector of size k, holding the singular values in descending order,
+   *  - V is a Matrix of size n x k that satisfies V' * V = eye(k).
+   *
+   * We assume n is smaller than m. The singular values and the right singular vectors are derived
+   * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
+   * storing the right singular vectors, is computed via matrix multiplication as
+   * U = A * (V * S^-1^), if requested by user. The actual method to use is determined
+   * automatically based on the cost:
+   *  - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
+   *    matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
+   *    This requires a single pass with O(n^2^) storage on each executor and on the driver, and
+   *    O(n^2^ k) time on the driver.
+   *  - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
+   *    compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
+   *    passes, O(n) storage on each executor, and O(n k) storage on the driver.
    *
-   * At most k largest non-zero singular values and associated vectors are returned.
-   * If there are k such values, then the dimensions of the return will be:
+   * Several internal parameters are set to default values. The reciprocal condition number rCond
+   * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
+   * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
+   * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
+   * eigen-decomposition is set to 1e-10.
    *
-   * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
-   * s is a Vector of size k, holding the singular values in descending order,
-   * and V is a Matrix of size n x k that satisfies V'V = eye(k).
+   * @note The conditions that decide which method to use internally and the default parameters are
+   *       subject to change.
    *
-   * @param k number of singular values to keep. We might return less than k if there are
-   *          numerically zero singular values. See rCond.
+   * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
+   *          there are numerically zero singular values or there are not enough Ritz values
+   *          converged before the maximum number of Arnoldi update iterations is reached (in case
+   *          that matrix A is ill-conditioned).
    * @param computeU whether to compute U
    * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
    *              are treated as zero, where sigma(0) is the largest singular value.
-   * @return SingularValueDecomposition(U, s, V)
+   * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
    */
   def computeSVD(
       k: Int,
       computeU: Boolean = false,
       rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
+    // maximum number of Arnoldi update iterations for invoking ARPACK
+    val maxIter = math.max(300, k * 3)
+    // numerical tolerance for invoking ARPACK
+    val tol = 1e-10
+    computeSVD(k, computeU, rCond, maxIter, tol, "auto")
+  }
+
+  /**
+   * The actual SVD implementation, visible for testing.
+   *
+   * @param k number of leading singular values to keep (0 < k <= n)
+   * @param computeU whether to compute U
+   * @param rCond the reciprocal condition number
+   * @param maxIter max number of iterations (if ARPACK is used)
+   * @param tol termination tolerance (if ARPACK is used)
+   * @param mode computation mode (auto: determine automatically which mode to use,
+   *             local-svd: compute gram matrix and computes its full SVD locally,
+   *             local-eigs: compute gram matrix and computes its top eigenvalues locally,
+   *             dist-eigs: compute the top eigenvalues of the gram matrix distributively)
+   * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
+   */
+  private[mllib] def computeSVD(
+      k: Int,
+      computeU: Boolean,
+      rCond: Double,
+      maxIter: Int,
+      tol: Double,
+      mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
     val n = numCols().toInt
-    require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
+    require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.")
 
-    val G = computeGramianMatrix()
+    object SVDMode extends Enumeration {
+      val LocalARPACK, LocalLAPACK, DistARPACK = Value
+    }
+
+    val computeMode = mode match {
+      case "auto" =>
+        // TODO: The conditions below are not fully tested.
+        if (n < 100 || k > n / 2) {
+          // If n is small or k is large compared with n, we better compute the Gramian matrix first
+          // and then compute its eigenvalues locally, instead of making multiple passes.
+          if (k < n / 3) {
+            SVDMode.LocalARPACK
+          } else {
+            SVDMode.LocalLAPACK
+          }
+        } else {
+          // If k is small compared with n, we use ARPACK with distributed multiplication.
+          SVDMode.DistARPACK
+        }
+      case "local-svd" => SVDMode.LocalLAPACK
+      case "local-eigs" => SVDMode.LocalARPACK
+      case "dist-eigs" => SVDMode.DistARPACK
+      case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
+    }
+
+    // Compute the eigen-decomposition of A' * A.
+    val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match {
+      case SVDMode.LocalARPACK =>
+        require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.")
+        val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+        EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter)
+      case SVDMode.LocalLAPACK =>
+        val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+        val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
+        (sigmaSquaresFull, uFull)
+      case SVDMode.DistARPACK =>
+        require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
+        EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
+    }
 
-    // TODO: Use sparse SVD instead.
-    val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
-      brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
     val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
 
-    // Determine effective rank.
+    // Determine the effective rank.
     val sigma0 = sigmas(0)
     val threshold = rCond * sigma0
     var i = 0
-    while (i < k && sigmas(i) >= threshold) {
+    // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
+    // criterion specified by tol after max number of iterations.
+    // Thus use i < min(k, sigmas.length) instead of i < k.
+    if (sigmas.length < k) {
+      logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.")
+    }
+    while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) {
       i += 1
     }
     val sk = i
@@ -272,12 +378,12 @@ class RowMatrix(
       logWarning(s"Requested $k singular values but only found $sk nonzeros.")
     }
 
-    val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
-    val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+    val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
+    val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
 
     if (computeU) {
       // N = Vk * Sk^{-1}
-      val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+      val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
       var i = 0
       var j = 0
       while (j < sk) {
@@ -364,7 +470,7 @@ class RowMatrix(
     if (k == n) {
       Matrices.dense(n, k, u.data)
     } else {
-      Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
+      Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
     }
   }
 
@@ -390,15 +496,24 @@ class RowMatrix(
    */
   def multiply(B: Matrix): RowMatrix = {
     val n = numCols().toInt
+    val k = B.numCols
     require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
 
     require(B.isInstanceOf[DenseMatrix],
       s"Only support dense matrix at this time but found ${B.getClass.getName}.")
 
-    val Bb = rows.context.broadcast(B)
+    val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray)
     val AB = rows.mapPartitions({ iter =>
-      val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
-      iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
+      val Bi = Bb.value
+      iter.map(row => {
+        val v = BDV.zeros[Double](k)
+        var i = 0
+        while (i < k) {
+          v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n))
+          i += 1
+        }
+        Vectors.fromBreeze(v)
+      })
     }, preservesPartitioning = true)
 
     new RowMatrix(AB, nRows, B.numCols)

http://git-wip-us.apache.org/repos/asf/spark/blob/1f33e1f2/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index c9f9acf..a961f89 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -96,37 +96,44 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
 
   test("svd of a full-rank matrix") {
     for (mat <- Seq(denseMat, sparseMat)) {
-      val localMat = mat.toBreeze()
-      val (localU, localSigma, localVt) = brzSvd(localMat)
-      val localV: BDM[Double] = localVt.t.toDenseMatrix
-      for (k <- 1 to n) {
-        val svd = mat.computeSVD(k, computeU = true)
-        val U = svd.U
-        val s = svd.s
-        val V = svd.V
-        assert(U.numRows() === m)
-        assert(U.numCols() === k)
-        assert(s.size === k)
-        assert(V.numRows === n)
-        assert(V.numCols === k)
-        assertColumnEqualUpToSign(U.toBreeze(), localU, k)
-        assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
-        assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+      for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+        val localMat = mat.toBreeze()
+        val (localU, localSigma, localVt) = brzSvd(localMat)
+        val localV: BDM[Double] = localVt.t.toDenseMatrix
+        for (k <- 1 to n) {
+          val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n
+          if (!skip) {
+            val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode)
+            val U = svd.U
+            val s = svd.s
+            val V = svd.V
+            assert(U.numRows() === m)
+            assert(U.numCols() === k)
+            assert(s.size === k)
+            assert(V.numRows === n)
+            assert(V.numCols === k)
+            assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+            assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+            assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+          }
+        }
+        val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode)
+        assert(svdWithoutU.U === null)
       }
-      val svdWithoutU = mat.computeSVD(n)
-      assert(svdWithoutU.U === null)
     }
   }
 
   test("svd of a low-rank matrix") {
-    val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
-    val mat = new RowMatrix(rows, 4, 2)
-    val svd = mat.computeSVD(2, computeU = true)
-    assert(svd.s.size === 1, "should not return zero singular values")
-    assert(svd.U.numRows() === 4)
-    assert(svd.U.numCols() === 1)
-    assert(svd.V.numRows === 2)
-    assert(svd.V.numCols === 1)
+    val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
+    val mat = new RowMatrix(rows, 4, 3)
+    for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+      val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode)
+      assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}")
+      assert(svd.U.numRows() === 4)
+      assert(svd.U.numCols() === 1)
+      assert(svd.V.numRows === 3)
+      assert(svd.V.numCols === 1)
+    }
   }
 
   def closeToZero(G: BDM[Double]): Boolean = {