You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/09 08:01:25 UTC

[1/2] [SPARK-1390] Refactoring of matrices backed by RDDs

Repository: spark
Updated Branches:
  refs/heads/master fa0524fd0 -> 9689b663a


http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
deleted file mode 100644
index 20e2b0f..0000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class SVDSuite extends FunSuite with BeforeAndAfterAll {
-  @transient private var sc: SparkContext = _
-
-  override def beforeAll() {
-    sc = new SparkContext("local", "test")
-  }
-
-  override def afterAll() {
-    sc.stop()
-    System.clearProperty("spark.driver.port")
-  }
-
-  val EPSILON = 1e-4
-
-  // Return jblas matrix from sparse matrix RDD
-  def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
-    val data = matrix.data
-    val m = matrix.m
-    val n = matrix.n
-    val ret = DoubleMatrix.zeros(m, n)
-    matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
-    ret
-  }
-
-  def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
-    assert(a.rows == b.rows && a.columns == b.columns,
-      "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
-    for (i <- 0 until a.columns) {
-      val aCol = a.getColumn(i)
-      val bCol = b.getColumn(i)
-      val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
-      assert(diff < EPSILON, "matrix mismatch: " + diff)
-    }
-  }
-
-  test("full rank matrix svd") {
-    val m = 10
-    val n = 3
-    val datarr = Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
-    val data = sc.makeRDD(datarr, 3)
-
-    val a = SparseMatrix(data, m, n)
-
-    val decomposed = new SVD().setK(n).compute(a)
-    val u = decomposed.U
-    val v = decomposed.V
-    val s = decomposed.S
-
-    val denseA = getDenseMatrix(a)
-    val svd = Singular.sparseSVD(denseA)
-
-    val retu = getDenseMatrix(u)
-    val rets = getDenseMatrix(s)
-    val retv = getDenseMatrix(v)
- 
- 
-    // check individual decomposition  
-    assertMatrixApproximatelyEquals(retu, svd(0))
-    assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
-    assertMatrixApproximatelyEquals(retv, svd(2))
-
-    // check multiplication guarantee
-    assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)  
-  }
-
- test("dense full rank matrix svd") {
-    val m = 10
-    val n = 3
-    val datarr = Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
-    val data = sc.makeRDD(datarr, 3)
-
-    val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
-    val decomposed = new SVD().setK(n).setComputeU(true).compute(a)
-    val u = LAUtils.denseToSparse(decomposed.U)
-    val v = decomposed.V
-    val s = decomposed.S
-
-    val denseA = getDenseMatrix(LAUtils.denseToSparse(a))
-    val svd = Singular.sparseSVD(denseA)
-
-    val retu = getDenseMatrix(u)
-    val rets = DoubleMatrix.diag(new DoubleMatrix(s))
-    val retv = new DoubleMatrix(v)
-
-
-    // check individual decomposition  
-    assertMatrixApproximatelyEquals(retu, svd(0))
-    assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
-    assertMatrixApproximatelyEquals(retv, svd(2))
-
-    // check multiplication guarantee
-    assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
-  }
-
- test("rank one matrix svd") {
-    val m = 10
-    val n = 3   
-    val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) =>
-      MatrixEntry(a, b, 1.0) }.flatten )
-    val k = 1
-
-    val a = SparseMatrix(data, m, n)
-
-    val decomposed = new SVD().setK(k).compute(a)
-    val u = decomposed.U
-    val s = decomposed.S
-    val v = decomposed.V
-    val retrank = s.data.collect().length
-
-    assert(retrank == 1, "rank returned not one")
-
-    val denseA = getDenseMatrix(a)
-    val svd = Singular.sparseSVD(denseA)
-
-    val retu = getDenseMatrix(u)
-    val rets = getDenseMatrix(s)
-    val retv = getDenseMatrix(v)
-
-    // check individual decomposition  
-    assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
-    assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
-    assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
-
-     // check multiplication guarantee
-    assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)  
-  }
-
- test("truncated with k") {
-    val m = 10
-    val n = 3
-    val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
-    val a = SparseMatrix(data, m, n)
-    
-    val k = 1 // only one svalue above this
-
-    val decomposed = new SVD().setK(k).compute(a)
-    val u = decomposed.U
-    val s = decomposed.S
-    val v = decomposed.V
-    val retrank = s.data.collect().length
-
-    val denseA = getDenseMatrix(a)
-    val svd = Singular.sparseSVD(denseA)
-
-    val retu = getDenseMatrix(u)
-    val rets = getDenseMatrix(s)
-    val retv = getDenseMatrix(v)
-
-    assert(retrank == 1, "rank returned not one")
-    
-    // check individual decomposition  
-    assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
-    assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
-    assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
new file mode 100644
index 0000000..cd45438
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
+
+class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
+
+  val m = 5
+  val n = 4
+  var mat: CoordinateMatrix = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    val entries = sc.parallelize(Seq(
+      (0, 0, 1.0),
+      (0, 1, 2.0),
+      (1, 1, 3.0),
+      (1, 2, 4.0),
+      (2, 2, 5.0),
+      (2, 3, 6.0),
+      (3, 0, 7.0),
+      (3, 3, 8.0),
+      (4, 1, 9.0)), 3).map { case (i, j, value) =>
+      MatrixEntry(i, j, value)
+    }
+    mat = new CoordinateMatrix(entries)
+  }
+
+  test("size") {
+    assert(mat.numRows() === m)
+    assert(mat.numCols() === n)
+  }
+
+  test("empty entries") {
+    val entries = sc.parallelize(Seq[MatrixEntry](), 1)
+    val emptyMat = new CoordinateMatrix(entries)
+    intercept[RuntimeException] {
+      emptyMat.numCols()
+    }
+    intercept[RuntimeException] {
+      emptyMat.numRows()
+    }
+  }
+
+  test("toBreeze") {
+    val expected = BDM(
+      (1.0, 2.0, 0.0, 0.0),
+      (0.0, 3.0, 4.0, 0.0),
+      (0.0, 0.0, 5.0, 6.0),
+      (7.0, 0.0, 0.0, 8.0),
+      (0.0, 9.0, 0.0, 0.0))
+    assert(mat.toBreeze() === expected)
+  }
+
+  test("toIndexedRowMatrix") {
+    val indexedRowMatrix = mat.toIndexedRowMatrix()
+    val expected = BDM(
+      (1.0, 2.0, 0.0, 0.0),
+      (0.0, 3.0, 4.0, 0.0),
+      (0.0, 0.0, 5.0, 6.0),
+      (7.0, 0.0, 0.0, 8.0),
+      (0.0, 9.0, 0.0, 0.0))
+    assert(indexedRowMatrix.toBreeze() === expected)
+  }
+
+  test("toRowMatrix") {
+    val rowMatrix = mat.toRowMatrix()
+    val rows = rowMatrix.rows.collect().toSet
+    val expected = Set(
+      Vectors.dense(1.0, 2.0, 0.0, 0.0),
+      Vectors.dense(0.0, 3.0, 4.0, 0.0),
+      Vectors.dense(0.0, 0.0, 5.0, 6.0),
+      Vectors.dense(7.0, 0.0, 0.0, 8.0),
+      Vectors.dense(0.0, 9.0, 0.0, 0.0))
+    assert(rows === expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
new file mode 100644
index 0000000..f7c46f2
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
+
+class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext {
+
+  val m = 4
+  val n = 3
+  val data = Seq(
+    (0L, Vectors.dense(0.0, 1.0, 2.0)),
+    (1L, Vectors.dense(3.0, 4.0, 5.0)),
+    (3L, Vectors.dense(9.0, 0.0, 1.0))
+  ).map(x => IndexedRow(x._1, x._2))
+  var indexedRows: RDD[IndexedRow] = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    indexedRows = sc.parallelize(data, 2)
+  }
+
+  test("size") {
+    val mat1 = new IndexedRowMatrix(indexedRows)
+    assert(mat1.numRows() === m)
+    assert(mat1.numCols() === n)
+
+    val mat2 = new IndexedRowMatrix(indexedRows, 5, 0)
+    assert(mat2.numRows() === 5)
+    assert(mat2.numCols() === n)
+  }
+
+  test("empty rows") {
+    val rows = sc.parallelize(Seq[IndexedRow](), 1)
+    val mat = new IndexedRowMatrix(rows)
+    intercept[RuntimeException] {
+      mat.numRows()
+    }
+    intercept[RuntimeException] {
+      mat.numCols()
+    }
+  }
+
+  test("toBreeze") {
+    val mat = new IndexedRowMatrix(indexedRows)
+    val expected = BDM(
+      (0.0, 1.0, 2.0),
+      (3.0, 4.0, 5.0),
+      (0.0, 0.0, 0.0),
+      (9.0, 0.0, 1.0))
+    assert(mat.toBreeze() === expected)
+  }
+
+  test("toRowMatrix") {
+    val idxRowMat = new IndexedRowMatrix(indexedRows)
+    val rowMat = idxRowMat.toRowMatrix()
+    assert(rowMat.numCols() === n)
+    assert(rowMat.numRows() === 3, "should drop empty rows")
+    assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq)
+  }
+
+  test("multiply a local matrix") {
+    val A = new IndexedRowMatrix(indexedRows)
+    val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+    val C = A.multiply(B)
+    val localA = A.toBreeze()
+    val localC = C.toBreeze()
+    val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]]
+    assert(localC === expected)
+  }
+
+  test("gram") {
+    val A = new IndexedRowMatrix(indexedRows)
+    val G = A.computeGramianMatrix()
+    val expected = BDM(
+      (90.0, 12.0, 24.0),
+      (12.0, 17.0, 22.0),
+      (24.0, 22.0, 30.0))
+    assert(G.toBreeze === expected)
+  }
+
+  test("svd") {
+    val A = new IndexedRowMatrix(indexedRows)
+    val svd = A.computeSVD(n, computeU = true)
+    assert(svd.U.isInstanceOf[IndexedRowMatrix])
+    val localA = A.toBreeze()
+    val U = svd.U.toBreeze()
+    val s = svd.s.toBreeze.asInstanceOf[BDV[Double]]
+    val V = svd.V.toBreeze.asInstanceOf[BDM[Double]]
+    assert(closeToZero(U.t * U - BDM.eye[Double](n)))
+    assert(closeToZero(V.t * V - BDM.eye[Double](n)))
+    assert(closeToZero(U * brzDiag(s) * V.t - localA))
+  }
+
+  def closeToZero(G: BDM[Double]): Boolean = {
+    G.valuesIterator.map(math.abs).sum < 1e-6
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
new file mode 100644
index 0000000..71ee8e8
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector}
+
+class RowMatrixSuite extends FunSuite with LocalSparkContext {
+
+  val m = 4
+  val n = 3
+  val arr = Array(0.0, 3.0, 6.0, 9.0, 1.0, 4.0, 7.0, 0.0, 2.0, 5.0, 8.0, 1.0)
+  val denseData = Seq(
+    Vectors.dense(0.0, 1.0, 2.0),
+    Vectors.dense(3.0, 4.0, 5.0),
+    Vectors.dense(6.0, 7.0, 8.0),
+    Vectors.dense(9.0, 0.0, 1.0)
+  )
+  val sparseData = Seq(
+    Vectors.sparse(3, Seq((1, 1.0), (2, 2.0))),
+    Vectors.sparse(3, Seq((0, 3.0), (1, 4.0), (2, 5.0))),
+    Vectors.sparse(3, Seq((0, 6.0), (1, 7.0), (2, 8.0))),
+    Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
+  )
+
+  val principalComponents = BDM(
+    (0.0, 1.0, 0.0),
+    (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0),
+    (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0))
+
+  var denseMat: RowMatrix = _
+  var sparseMat: RowMatrix = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    denseMat = new RowMatrix(sc.parallelize(denseData, 2))
+    sparseMat = new RowMatrix(sc.parallelize(sparseData, 2))
+  }
+
+  test("size") {
+    assert(denseMat.numRows() === m)
+    assert(denseMat.numCols() === n)
+    assert(sparseMat.numRows() === m)
+    assert(sparseMat.numCols() === n)
+  }
+
+  test("empty rows") {
+    val rows = sc.parallelize(Seq[Vector](), 1)
+    val emptyMat = new RowMatrix(rows)
+    intercept[RuntimeException] {
+      emptyMat.numCols()
+    }
+    intercept[RuntimeException] {
+      emptyMat.numRows()
+    }
+  }
+
+  test("toBreeze") {
+    val expected = BDM(
+      (0.0, 1.0, 2.0),
+      (3.0, 4.0, 5.0),
+      (6.0, 7.0, 8.0),
+      (9.0, 0.0, 1.0))
+    for (mat <- Seq(denseMat, sparseMat)) {
+      assert(mat.toBreeze() === expected)
+    }
+  }
+
+  test("gram") {
+    val expected =
+      Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val G = mat.computeGramianMatrix()
+      assert(G.toBreeze === expected.toBreeze)
+    }
+  }
+
+  test("svd of a full-rank matrix") {
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val localMat = mat.toBreeze()
+      val (localU, localSigma, localVt) = brzSvd(localMat)
+      val localV: BDM[Double] = localVt.t.toDenseMatrix
+      for (k <- 1 to n) {
+        val svd = mat.computeSVD(k, computeU = true)
+        val U = svd.U
+        val s = svd.s
+        val V = svd.V
+        assert(U.numRows() === m)
+        assert(U.numCols() === k)
+        assert(s.size === k)
+        assert(V.numRows === n)
+        assert(V.numCols === k)
+        assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+        assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+        assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+      }
+      val svdWithoutU = mat.computeSVD(n)
+      assert(svdWithoutU.U === null)
+    }
+  }
+
+  test("svd of a low-rank matrix") {
+    val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
+    val mat = new RowMatrix(rows, 4, 2)
+    val svd = mat.computeSVD(2, computeU = true)
+    assert(svd.s.size === 1, "should not return zero singular values")
+    assert(svd.U.numRows() === 4)
+    assert(svd.U.numCols() === 1)
+    assert(svd.V.numRows === 2)
+    assert(svd.V.numCols === 1)
+  }
+
+  def closeToZero(G: BDM[Double]): Boolean = {
+    G.valuesIterator.map(math.abs).sum < 1e-6
+  }
+
+  def closeToZero(v: BDV[Double]): Boolean = {
+    brzNorm(v, 1.0) < 1e-6
+  }
+
+  def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) {
+    assert(A.rows === B.rows)
+    for (j <- 0 until k) {
+      val aj = A(::, j)
+      val bj = B(::, j)
+      assert(closeToZero(aj - bj) || closeToZero(aj + bj),
+        s"The $j-th columns mismatch: $aj and $bj")
+    }
+  }
+
+  test("pca") {
+    for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
+      val pc = denseMat.computePrincipalComponents(k)
+      assert(pc.numRows === n)
+      assert(pc.numCols === k)
+      assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
+    }
+  }
+
+  test("multiply a local matrix") {
+    val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val AB = mat.multiply(B)
+      assert(AB.numRows() === m)
+      assert(AB.numCols() === 2)
+      assert(AB.rows.collect().toSeq === Seq(
+        Vectors.dense(5.0, 14.0),
+        Vectors.dense(14.0, 50.0),
+        Vectors.dense(23.0, 86.0),
+        Vectors.dense(2.0, 32.0)
+      ))
+    }
+  }
+}


[2/2] git commit: [SPARK-1390] Refactoring of matrices backed by RDDs

Posted by pw...@apache.org.
[SPARK-1390] Refactoring of matrices backed by RDDs

This is to refactor interfaces for matrices backed by RDDs. It would be better if we have a clear separation of local matrices and those backed by RDDs. Right now, we have

1. `org.apache.spark.mllib.linalg.SparseMatrix`, which is a wrapper over an RDD of matrix entries, i.e., coordinate list format.
2. `org.apache.spark.mllib.linalg.TallSkinnyDenseMatrix`, which is a wrapper over RDD[Array[Double]], i.e. row-oriented format.

We will see naming collision when we introduce local `SparseMatrix`, and the name `TallSkinnyDenseMatrix` is not exact if we switch to `RDD[Vector]` from `RDD[Array[Double]]`. It would be better to have "RDD" in the class name to suggest that operations may trigger jobs.

The proposed names are (all under `org.apache.spark.mllib.linalg.rdd`):

1. `RDDMatrix`: trait for matrices backed by one or more RDDs
2. `CoordinateRDDMatrix`: wrapper of `RDD[(Long, Long, Double)]`
3. `RowRDDMatrix`: wrapper of `RDD[Vector]` whose rows do not have special ordering
4. `IndexedRowRDDMatrix`: wrapper of `RDD[(Long, Vector)]` whose rows are associated with indices

The current code also introduces local matrices.

Author: Xiangrui Meng <me...@databricks.com>

Closes #296 from mengxr/mat and squashes the following commits:

24d8294 [Xiangrui Meng] fix for groupBy returning Iterable
bfc2b26 [Xiangrui Meng] merge master
8e4f1f5 [Xiangrui Meng] Merge branch 'master' into mat
0135193 [Xiangrui Meng] address Reza's comments
03cd7e1 [Xiangrui Meng] add pca/gram to IndexedRowMatrix add toBreeze to DistributedMatrix for test simplify tests
b177ff1 [Xiangrui Meng] address Matei's comments
be119fe [Xiangrui Meng] rename m/n to numRows/numCols for local matrix add tests for matrices
b881506 [Xiangrui Meng] rename SparkPCA/SVD to TallSkinnyPCA/SVD
e7d0d4a [Xiangrui Meng] move IndexedRDDMatrixRow to IndexedRowRDDMatrix
0d1491c [Xiangrui Meng] fix test errors
a85262a [Xiangrui Meng] rename RDDMatrixRow to IndexedRDDMatrixRow
b8b6ac3 [Xiangrui Meng] Remove old code
4cf679c [Xiangrui Meng] port pca to RowRDDMatrix, and add multiply and covariance
7836e2f [Xiangrui Meng] initial refactoring of matrices backed by RDDs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9689b663
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9689b663
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9689b663

Branch: refs/heads/master
Commit: 9689b663a2a4947ad60795321c770052f3c637f1
Parents: fa0524f
Author: Xiangrui Meng <me...@databricks.com>
Authored: Tue Apr 8 23:01:15 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Apr 8 23:01:15 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/examples/mllib/SparkPCA.scala  |  51 ---
 .../apache/spark/examples/mllib/SparkSVD.scala  |  59 ---
 .../spark/examples/mllib/TallSkinnyPCA.scala    |  64 +++
 .../spark/examples/mllib/TallSkinnySVD.scala    |  64 +++
 .../apache/spark/mllib/linalg/Matrices.scala    | 101 +++++
 .../apache/spark/mllib/linalg/MatrixEntry.scala |  27 --
 .../apache/spark/mllib/linalg/MatrixRow.scala   |  26 --
 .../apache/spark/mllib/linalg/MatrixSVD.scala   |  29 --
 .../org/apache/spark/mllib/linalg/PCA.scala     | 120 ------
 .../org/apache/spark/mllib/linalg/SVD.scala     | 395 -------------------
 .../linalg/SingularValueDecomposition.scala     |  21 +
 .../spark/mllib/linalg/SparseMatrix.scala       |  30 --
 .../mllib/linalg/TallSkinnyDenseMatrix.scala    |  30 --
 .../mllib/linalg/TallSkinnyMatrixSVD.scala      |  31 --
 .../linalg/distributed/CoordinateMatrix.scala   | 112 ++++++
 .../linalg/distributed/DistributedMatrix.scala  |  37 ++
 .../linalg/distributed/IndexedRowMatrix.scala   | 148 +++++++
 .../mllib/linalg/distributed/RowMatrix.scala    | 344 ++++++++++++++++
 .../org/apache/spark/mllib/util/LAUtils.scala   |  67 ----
 .../linalg/BreezeMatrixConversionSuite.scala    |  40 ++
 .../spark/mllib/linalg/MatricesSuite.scala      |  39 ++
 .../apache/spark/mllib/linalg/PCASuite.scala    | 124 ------
 .../apache/spark/mllib/linalg/SVDSuite.scala    | 194 ---------
 .../distributed/CoordinateMatrixSuite.scala     |  98 +++++
 .../distributed/IndexedRowMatrixSuite.scala     | 120 ++++++
 .../linalg/distributed/RowMatrixSuite.scala     | 173 ++++++++
 26 files changed, 1361 insertions(+), 1183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
deleted file mode 100644
index d4e08c5..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.mllib
-      
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.linalg.PCA
-import org.apache.spark.mllib.linalg.MatrixEntry
-import org.apache.spark.mllib.linalg.SparseMatrix
-import org.apache.spark.mllib.util._
-
-
-/**
- * Compute PCA of an example matrix.
- */
-object SparkPCA {
-  def main(args: Array[String]) {
-    if (args.length != 3) {
-      System.err.println("Usage: SparkPCA <master> m n")
-      System.exit(1)
-    }
-    val sc = new SparkContext(args(0), "PCA",
-      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
-    val m = args(2).toInt
-    val n = args(3).toInt
-
-    // Make example matrix
-    val data = Array.tabulate(m, n) { (a, b) =>
-      (a + 2).toDouble * (b + 1) / (1 + a + b) }
-
-    // recover top principal component
-    val coeffs = new PCA().setK(1).compute(sc.makeRDD(data))
-
-    println("top principal component = " + coeffs.mkString(", "))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
deleted file mode 100644
index 2933cec..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.mllib
-      
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.linalg.SVD
-import org.apache.spark.mllib.linalg.MatrixEntry
-import org.apache.spark.mllib.linalg.SparseMatrix
-
-/**
- * Compute SVD of an example matrix
- * Input file should be comma separated, 1 indexed of the form
- * i,j,value
- * Where i is the column, j the row, and value is the matrix entry
- * 
- * For example input file, see:
- * mllib/data/als/test.data  (example is 4 x 4)
- */
-object SparkSVD {
-  def main(args: Array[String]) {
-   if (args.length != 4) {
-      System.err.println("Usage: SparkSVD <master> <file> m n")
-      System.exit(1)
-    }
-    val sc = new SparkContext(args(0), "SVD",
-      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
-    // Load and parse the data file
-    val data = sc.textFile(args(1)).map { line =>
-      val parts = line.split(',')
-      MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble)
-    }
-    val m = args(2).toInt
-    val n = args(3).toInt
-
-    // recover largest singular vector
-    val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n))
-    val u = decomposed.U.data
-    val s = decomposed.S.data
-    val v = decomposed.V.data
-
-    println("singular values = " + s.collect().mkString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
new file mode 100644
index 0000000..a177435
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+      
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Compute the principal components of a tall-and-skinny matrix, whose rows are observations.
+ *
+ * The input matrix must be stored in row-oriented dense format, one line per row with its entries
+ * separated by space. For example,
+ * {{{
+ * 0.5 1.0
+ * 2.0 3.0
+ * 4.0 5.0
+ * }}}
+ * represents a 3-by-2 matrix, whose first row is (0.5, 1.0).
+ */
+object TallSkinnyPCA {
+  def main(args: Array[String]) {
+    if (args.length != 2) {
+      System.err.println("Usage: TallSkinnyPCA <master> <file>")
+      System.exit(1)
+    }
+
+    val conf = new SparkConf()
+      .setMaster(args(0))
+      .setAppName("TallSkinnyPCA")
+      .setSparkHome(System.getenv("SPARK_HOME"))
+      .setJars(SparkContext.jarOfClass(this.getClass))
+    val sc = new SparkContext(conf)
+
+    // Load and parse the data file.
+    val rows = sc.textFile(args(1)).map { line =>
+      val values = line.split(' ').map(_.toDouble)
+      Vectors.dense(values)
+    }
+    val mat = new RowMatrix(rows)
+
+    // Compute principal components.
+    val pc = mat.computePrincipalComponents(mat.numCols().toInt)
+
+    println("Principal components are:\n" + pc)
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
new file mode 100644
index 0000000..49d0969
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+      
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Compute the singular value decomposition (SVD) of a tall-and-skinny matrix.
+ *
+ * The input matrix must be stored in row-oriented dense format, one line per row with its entries
+ * separated by space. For example,
+ * {{{
+ * 0.5 1.0
+ * 2.0 3.0
+ * 4.0 5.0
+ * }}}
+ * represents a 3-by-2 matrix, whose first row is (0.5, 1.0).
+ */
+object TallSkinnySVD {
+  def main(args: Array[String]) {
+    if (args.length != 2) {
+      System.err.println("Usage: TallSkinnySVD <master> <file>")
+      System.exit(1)
+    }
+
+    val conf = new SparkConf()
+      .setMaster(args(0))
+      .setAppName("TallSkinnySVD")
+      .setSparkHome(System.getenv("SPARK_HOME"))
+      .setJars(SparkContext.jarOfClass(this.getClass))
+    val sc = new SparkContext(conf)
+
+    // Load and parse the data file.
+    val rows = sc.textFile(args(1)).map { line =>
+      val values = line.split(' ').map(_.toDouble)
+      Vectors.dense(values)
+    }
+    val mat = new RowMatrix(rows)
+
+    // Compute SVD.
+    val svd = mat.computeSVD(mat.numCols().toInt)
+
+    println("Singular values are " + svd.s)
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
new file mode 100644
index 0000000..b11ba5d
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{Matrix => BM, DenseMatrix => BDM}
+
+/**
+ * Trait for a local matrix.
+ */
+trait Matrix extends Serializable {
+
+  /** Number of rows. */
+  def numRows: Int
+
+  /** Number of columns. */
+  def numCols: Int
+
+  /** Converts to a dense array in column major. */
+  def toArray: Array[Double]
+
+  /** Converts to a breeze matrix. */
+  private[mllib] def toBreeze: BM[Double]
+
+  /** Gets the (i, j)-th element. */
+  private[mllib] def apply(i: Int, j: Int): Double = toBreeze(i, j)
+
+  override def toString: String = toBreeze.toString()
+}
+
+/**
+ * Column-majored dense matrix.
+ * The entry values are stored in a single array of doubles with columns listed in sequence.
+ * For example, the following matrix
+ * {{{
+ *   1.0 2.0
+ *   3.0 4.0
+ *   5.0 6.0
+ * }}}
+ * is stored as `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]`.
+ *
+ * @param numRows number of rows
+ * @param numCols number of columns
+ * @param values matrix entries in column major
+ */
+class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix {
+
+  require(values.length == numRows * numCols)
+
+  override def toArray: Array[Double] = values
+
+  private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
+}
+
+/**
+ * Factory methods for [[org.apache.spark.mllib.linalg.Matrix]].
+ */
+object Matrices {
+
+  /**
+   * Creates a column-majored dense matrix.
+   *
+   * @param numRows number of rows
+   * @param numCols number of columns
+   * @param values matrix entries in column major
+   */
+  def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix = {
+    new DenseMatrix(numRows, numCols, values)
+  }
+
+  /**
+   * Creates a Matrix instance from a breeze matrix.
+   * @param breeze a breeze matrix
+   * @return a Matrix instance
+   */
+  private[mllib] def fromBreeze(breeze: BM[Double]): Matrix = {
+    breeze match {
+      case dm: BDM[Double] =>
+        require(dm.majorStride == dm.rows,
+          "Do not support stride size different from the number of rows.")
+        new DenseMatrix(dm.rows, dm.cols, dm.data)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Do not support conversion from type ${breeze.getClass.getName}.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
deleted file mode 100644
index 416996f..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents an entry in a sparse matrix of doubles.
- *
- * @param i row index (0 indexing used)
- * @param j column index (0 indexing used)
- * @param mval value of entry in matrix
- */
-case class MatrixEntry(val i: Int, val j: Int, val mval: Double)

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
deleted file mode 100644
index 2608a67..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents a row of a dense matrix
- *
- * @param i row index (0 indexing used)
- * @param data entries of the row
- */
-case class MatrixRow(val i: Int, val data: Array[Double])

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
deleted file mode 100644
index 319f82b..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents the SV decomposition of a matrix
- *
- * @param U such that A = USV^T
- * @param S such that A = USV^T
- * @param V such that A = USV^T
- */
-case class MatrixSVD(val U: SparseMatrix,
-                     val S: SparseMatrix,
-                     val V: SparseMatrix)

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
deleted file mode 100644
index fe5b3f6..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.rdd.RDD
-
-
-import org.jblas.DoubleMatrix
-
-
-/**
- * Class used to obtain principal components
- */
-class PCA {
-  private var k = 1
-
-  /**
-   * Set the number of top-k principle components to return
-   */
-  def setK(k: Int): PCA = {
-    this.k = k
-    this
-  }
-
-  /**
-   * Compute PCA using the current set parameters
-   */
-  def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
-    computePCA(matrix)
-  }
-
-  /**
-   * Compute PCA using the parameters currently set
-   * See computePCA() for more details
-   */
-  def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
-    computePCA(matrix)
-  }
-
-  /**
-   * Computes the top k principal component coefficients for the m-by-n data matrix X.
-   * Rows of X correspond to observations and columns correspond to variables. 
-   * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
-   * for one principal component, and the columns are in descending 
-   * order of component variance.
-   * This function centers the data and uses the 
-   * singular value decomposition (SVD) algorithm. 
-   *
-   * @param matrix dense matrix to perform PCA on
-   * @return An nxk matrix with principal components in columns. Columns are inner arrays
-   */
-  private def computePCA(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
-    val m = matrix.m
-    val n = matrix.n
-
-    if (m <= 0 || n <= 0) {
-      throw new IllegalArgumentException("Expecting a well-formed matrix: m=$m n=$n")
-    }
-
-    computePCA(matrix.rows.map(_.data))
-  }
-
-  /**
-   * Computes the top k principal component coefficients for the m-by-n data matrix X.
-   * Rows of X correspond to observations and columns correspond to variables. 
-   * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
-   * for one principal component, and the columns are in descending 
-   * order of component variance.
-   * This function centers the data and uses the 
-   * singular value decomposition (SVD) algorithm. 
-   *
-   * @param matrix dense matrix to perform pca on
-   * @return An nxk matrix of principal components
-   */
-  private def computePCA(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
-    val n = matrix.first.size
-
-    // compute column sums and normalize matrix
-    val colSumsTemp = matrix.map((_, 1)).fold((Array.ofDim[Double](n), 0)) {
-      (a, b) =>
-        val am = new DoubleMatrix(a._1)
-        val bm = new DoubleMatrix(b._1)
-        am.addi(bm)
-        (a._1, a._2 + b._2)
-    }
-
-    val m = colSumsTemp._2
-    val colSums = colSumsTemp._1.map(x => x / m)
-
-    val data = matrix.map {
-      x =>
-        val row = Array.ofDim[Double](n)
-        var i = 0
-        while (i < n) {
-          row(i) = x(i) - colSums(i)
-          i += 1
-        }
-        row
-    }
-
-    val (u, s, v) = new SVD().setK(k).compute(data)
-    v
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
deleted file mode 100644
index 0d97b7d..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-/**
- * Class used to obtain singular value decompositions
- */
-class SVD {
-  private var k = 1
-  private var computeU = true
-
-  // All singular values smaller than rCond * sigma(0)
-  // are treated as zero, where sigma(0) is the largest singular value.
-  private var rCond = 1e-9
-
-  /**
-   * Set the number of top-k singular vectors to return
-   */
-  def setK(k: Int): SVD = {
-    this.k = k
-    this
-  }
-
-  /**
-   * Sets the reciprocal condition number (rCond). All singular values
-   * smaller than rCond * sigma(0) are treated as zero,
-   * where sigma(0) is the largest singular value.
-   */
-  def setReciprocalConditionNumber(smallS: Double): SVD = {
-    this.rCond = smallS
-    this
-  }
-
-  /**
-   * Should U be computed?
-   */
-  def setComputeU(compU: Boolean): SVD = {
-    this.computeU = compU
-    this
-  }
-
-  /**
-   * Compute SVD using the current set parameters
-   */
-  def compute(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
-    denseSVD(matrix)
-  }
-
-  /**
-   * Compute SVD using the current set parameters
-   * Returns (U, S, V)  such that A = USV^T
-   * U is a row-by-row dense matrix
-   * S is a simple double array of singular values
-   * V is a 2d array matrix
-   * See [[denseSVD]] for more documentation
-   */
-  def compute(matrix: RDD[Array[Double]]):
-  (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
-    denseSVD(matrix)
-  }
-
-  /**
-   * See full paramter definition of sparseSVD for more description.
-   *
-   * @param matrix sparse matrix to factorize
-   * @return Three sparse matrices: U, S, V such that A = USV^T
-   */
-  def compute(matrix: SparseMatrix): MatrixSVD = {
-    sparseSVD(matrix)
-  }
-
-  /**
-   * Singular Value Decomposition for Tall and Skinny matrices.
-   * Given an m x n matrix A, this will compute matrices U, S, V such that
-   * A = U * S * V'
-   *
-   * There is no restriction on m, but we require n^2 doubles to fit in memory.
-   * Further, n should be less than m.
-   *
-   * The decomposition is computed by first computing A'A = V S^2 V',
-   * computing svd locally on that (since n x n is small),
-   * from which we recover S and V.
-   * Then we compute U via easy matrix multiplication
-   * as U =  A * V * S^-1
-   *
-   * Only the k largest singular values and associated vectors are found.
-   * If there are k such values, then the dimensions of the return will be:
-   *
-   * S is k x k and diagonal, holding the singular values on diagonal
-   * U is m x k and satisfies U'U = eye(k)
-   * V is n x k and satisfies V'V = eye(k)
-   *
-   * @param matrix dense matrix to factorize
-   * @return See [[TallSkinnyMatrixSVD]] for the output matrices and arrays
-   */
-  private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
-    val m = matrix.m
-    val n = matrix.n
-
-    if (m < n || m <= 0 || n <= 0) {
-      throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n")
-    }
-
-    if (k < 1 || k > n) {
-      throw new IllegalArgumentException("Request up to n singular values n=$n k=$k")
-    }
-
-    val rowIndices = matrix.rows.map(_.i)
-
-    // compute SVD
-    val (u, sigma, v) = denseSVD(matrix.rows.map(_.data))
-
-    if (computeU) {
-      // prep u for returning
-      val retU = TallSkinnyDenseMatrix(
-        u.zip(rowIndices).map {
-          case (row, i) => MatrixRow(i, row)
-        },
-        m,
-        k)
-
-      TallSkinnyMatrixSVD(retU, sigma, v)
-    } else {
-      TallSkinnyMatrixSVD(null, sigma, v)
-    }
-  }
-
-  /**
-   * Singular Value Decomposition for Tall and Skinny matrices.
-   * Given an m x n matrix A, this will compute matrices U, S, V such that
-   * A = U * S * V'
-   *
-   * There is no restriction on m, but we require n^2 doubles to fit in memory.
-   * Further, n should be less than m.
-   *
-   * The decomposition is computed by first computing A'A = V S^2 V',
-   * computing svd locally on that (since n x n is small),
-   * from which we recover S and V.
-   * Then we compute U via easy matrix multiplication
-   * as U =  A * V * S^-1
-   *
-   * Only the k largest singular values and associated vectors are found.
-   * If there are k such values, then the dimensions of the return will be:
-   *
-   * S is k x k and diagonal, holding the singular values on diagonal
-   * U is m x k and satisfies U'U = eye(k)
-   * V is n x k and satisfies V'V = eye(k)
-   *
-   * The return values are as lean as possible: an RDD of rows for U,
-   * a simple array for sigma, and a dense 2d matrix array for V
-   *
-   * @param matrix dense matrix to factorize
-   * @return Three matrices: U, S, V such that A = USV^T
-   */
-  private def denseSVD(matrix: RDD[Array[Double]]):
-  (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
-    val n = matrix.first.size
-
-    if (k < 1 || k > n) {
-      throw new IllegalArgumentException(
-        "Request up to n singular values k=$k n=$n")
-    }
-
-    // Compute A^T A
-    val fullata = matrix.mapPartitions {
-      iter =>
-        val localATA = Array.ofDim[Double](n, n)
-        while (iter.hasNext) {
-          val row = iter.next()
-          var i = 0
-          while (i < n) {
-            var j = 0
-            while (j < n) {
-              localATA(i)(j) += row(i) * row(j)
-              j += 1
-            }
-            i += 1
-          }
-        }
-        Iterator(localATA)
-    }.fold(Array.ofDim[Double](n, n)) {
-      (a, b) =>
-        var i = 0
-        while (i < n) {
-          var j = 0
-          while (j < n) {
-            a(i)(j) += b(i)(j)
-            j += 1
-          }
-          i += 1
-        }
-        a
-    }
-
-    // Construct jblas A^T A locally
-    val ata = new DoubleMatrix(fullata)
-
-    // Since A^T A is small, we can compute its SVD directly
-    val svd = Singular.sparseSVD(ata)
-    val V = svd(0)
-    val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x / svd(1).get(0) > rCond)
-
-    val sk = Math.min(k, sigmas.size)
-    val sigma = sigmas.take(sk)
-
-    // prepare V for returning
-    val retV = Array.tabulate(n, sk)((i, j) => V.get(i, j))
-
-    if (computeU) {
-      // Compute U as U = A V S^-1
-      // Compute VS^-1
-      val vsinv = new DoubleMatrix(Array.tabulate(n, sk)((i, j) => V.get(i, j) / sigma(j)))
-      val retU = matrix.map {
-        x =>
-          val v = new DoubleMatrix(Array(x))
-          v.mmul(vsinv).data
-      }
-      (retU, sigma, retV)
-    } else {
-      (null, sigma, retV)
-    }
-  }
-
-  /**
-   * Singular Value Decomposition for Tall and Skinny sparse matrices.
-   * Given an m x n matrix A, this will compute matrices U, S, V such that
-   * A = U * S * V'
-   *
-   * There is no restriction on m, but we require O(n^2) doubles to fit in memory.
-   * Further, n should be less than m.
-   *
-   * The decomposition is computed by first computing A'A = V S^2 V',
-   * computing svd locally on that (since n x n is small),
-   * from which we recover S and V.
-   * Then we compute U via easy matrix multiplication
-   * as U =  A * V * S^-1
-   *
-   * Only the k largest singular values and associated vectors are found.
-   * If there are k such values, then the dimensions of the return will be:
-   *
-   * S is k x k and diagonal, holding the singular values on diagonal
-   * U is m x k and satisfies U'U = eye(k)
-   * V is n x k and satisfies V'V = eye(k)
-   *
-   * All input and output is expected in sparse matrix format, 0-indexed
-   * as tuples of the form ((i,j),value) all in RDDs using the
-   * SparseMatrix class
-   *
-   * @param matrix sparse matrix to factorize
-   * @return Three sparse matrices: U, S, V such that A = USV^T
-   */
-  private def sparseSVD(matrix: SparseMatrix): MatrixSVD = {
-    val data = matrix.data
-    val m = matrix.m
-    val n = matrix.n
-
-    if (m < n || m <= 0 || n <= 0) {
-      throw new IllegalArgumentException("Expecting a tall and skinny matrix")
-    }
-
-    if (k < 1 || k > n) {
-      throw new IllegalArgumentException("Must request up to n singular values")
-    }
-
-    // Compute A^T A, assuming rows are sparse enough to fit in memory
-    val rows = data.map(entry =>
-      (entry.i, (entry.j, entry.mval))).groupByKey()
-    val emits = rows.flatMap {
-      case (rowind, cols) =>
-        cols.flatMap {
-          case (colind1, mval1) =>
-            cols.map {
-              case (colind2, mval2) =>
-                ((colind1, colind2), mval1 * mval2)
-            }
-        }
-    }.reduceByKey(_ + _)
-
-    // Construct jblas A^T A locally
-    val ata = DoubleMatrix.zeros(n, n)
-    for (entry <- emits.collect()) {
-      ata.put(entry._1._1, entry._1._2, entry._2)
-    }
-
-    // Since A^T A is small, we can compute its SVD directly
-    val svd = Singular.sparseSVD(ata)
-    val V = svd(0)
-    // This will be updated to rcond
-    val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)
-
-    if (sigmas.size < k) {
-      throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size)
-    }
-
-    val sigma = sigmas.take(k)
-
-    val sc = data.sparkContext
-
-    // prepare V for returning
-    val retVdata = sc.makeRDD(
-      Array.tabulate(V.rows, sigma.length) {
-        (i, j) =>
-          MatrixEntry(i, j, V.get(i, j))
-      }.flatten)
-    val retV = SparseMatrix(retVdata, V.rows, sigma.length)
-
-    val retSdata = sc.makeRDD(Array.tabulate(sigma.length) {
-      x => MatrixEntry(x, x, sigma(x))
-    })
-
-    val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
-
-    // Compute U as U = A V S^-1
-    // turn V S^-1 into an RDD as a sparse matrix
-    val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) {
-      (i, j) => ((i, j), V.get(i, j) / sigma(j))
-    }.flatten)
-
-    if (computeU) {
-      // Multiply A by VS^-1
-      val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
-      val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
-      val retUdata = aCols.join(bRows).map {
-        case (key, ((rowInd, rowVal), (colInd, colVal))) =>
-          ((rowInd, colInd), rowVal * colVal)
-      }.reduceByKey(_ + _).map {
-        case ((row, col), mval) => MatrixEntry(row, col, mval)
-      }
-
-      val retU = SparseMatrix(retUdata, m, sigma.length)
-      MatrixSVD(retU, retS, retV)
-    } else {
-      MatrixSVD(null, retS, retV)
-    }
-  }
-}
-
-/**
- * Top-level methods for calling sparse Singular Value Decomposition
- * NOTE: All matrices are 0-indexed
- */
-object SVD {
-  def main(args: Array[String]) {
-    if (args.length < 8) {
-      println("Usage: SVD <master> <matrix_file> <m> <n> " +
-        "<k> <output_U_file> <output_S_file> <output_V_file>")
-      System.exit(1)
-    }
-
-    val (master, inputFile, m, n, k, output_u, output_s, output_v) =
-      (args(0), args(1), args(2).toInt, args(3).toInt,
-        args(4).toInt, args(5), args(6), args(7))
-
-    val sc = new SparkContext(master, "SVD")
-
-    val rawData = sc.textFile(inputFile)
-    val data = rawData.map {
-      line =>
-        val parts = line.split(',')
-        MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
-    }
-
-    val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n))
-    val u = decomposed.U.data
-    val s = decomposed.S.data
-    val v = decomposed.V.data
-
-    println("Computed " + s.collect().length + " singular values and vectors")
-    u.saveAsTextFile(output_u)
-    s.saveAsTextFile(output_s)
-    v.saveAsTextFile(output_v)
-    System.exit(0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
new file mode 100644
index 0000000..46b1054
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/** Represents singular value decomposition (SVD) factors. */
+case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType)

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
deleted file mode 100644
index cbd1a2a..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.rdd.RDD
-
-
-/**
- * Class that represents a sparse matrix
- *
- * @param data RDD of nonzero entries
- * @param m number of rows
- * @param n numner of columns
- */
-case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
deleted file mode 100644
index e4ef3c5..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.rdd.RDD
-
-
-/**
- * Class that represents a dense matrix
- *
- * @param rows RDD of rows
- * @param m number of rows
- * @param n number of columns
- */
-case class TallSkinnyDenseMatrix(val rows: RDD[MatrixRow], val m: Int, val n: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
deleted file mode 100644
index b3a450e..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents the singular value decomposition of a matrix
- *
- * @param U such that A = USV^T is a TallSkinnyDenseMatrix
- * @param S such that A = USV^T is a simple double array
- * @param V such that A = USV^T, V is a 2d array matrix that holds
- *          singular vectors in columns. Columns are inner arrays
- *          i.e. V(i)(j) is standard math notation V_{ij}
- */
-case class TallSkinnyMatrixSVD(val U: TallSkinnyDenseMatrix,
-                               val S: Array[Double],
-                               val V: Array[Array[Double]])

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
new file mode 100644
index 0000000..9194f65
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Represents an entry in an distributed matrix.
+ * @param i row index
+ * @param j column index
+ * @param value value of the entry
+ */
+case class MatrixEntry(i: Long, j: Long, value: Double)
+
+/**
+ * Represents a matrix in coordinate format.
+ *
+ * @param entries matrix entries
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ *              be determined by the max row index plus one.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ *              columns will be determined by the max column index plus one.
+ */
+class CoordinateMatrix(
+    val entries: RDD[MatrixEntry],
+    private var nRows: Long,
+    private var nCols: Long) extends DistributedMatrix {
+
+  /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+  def this(entries: RDD[MatrixEntry]) = this(entries, 0L, 0L)
+
+  /** Gets or computes the number of columns. */
+  override def numCols(): Long = {
+    if (nCols <= 0L) {
+      computeSize()
+    }
+    nCols
+  }
+
+  /** Gets or computes the number of rows. */
+  override def numRows(): Long = {
+    if (nRows <= 0L) {
+      computeSize()
+    }
+    nRows
+  }
+
+  /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
+  def toIndexedRowMatrix(): IndexedRowMatrix = {
+    val nl = numCols()
+    if (nl > Int.MaxValue) {
+      sys.error(s"Cannot convert to a row-oriented format because the number of columns $nl is " +
+        "too large.")
+    }
+    val n = nl.toInt
+    val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value)))
+      .groupByKey()
+      .map { case (i, vectorEntries) =>
+        IndexedRow(i, Vectors.sparse(n, vectorEntries.toSeq))
+      }
+    new IndexedRowMatrix(indexedRows, numRows(), n)
+  }
+
+  /**
+   * Converts to RowMatrix, dropping row indices after grouping by row index.
+   * The number of columns must be within the integer range.
+   */
+  def toRowMatrix(): RowMatrix = {
+    toIndexedRowMatrix().toRowMatrix()
+  }
+
+  /** Determines the size by computing the max row/column index. */
+  private def computeSize() {
+    // Reduce will throw an exception if `entries` is empty.
+    val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) =>
+      (math.max(i1, i2), math.max(j1, j2))
+    }
+    // There may be empty columns at the very right and empty rows at the very bottom.
+    nRows = math.max(nRows, m1 + 1L)
+    nCols = math.max(nCols, n1 + 1L)
+  }
+
+  /** Collects data and assembles a local matrix. */
+  private[mllib] override def toBreeze(): BDM[Double] = {
+    val m = numRows().toInt
+    val n = numCols().toInt
+    val mat = BDM.zeros[Double](m, n)
+    entries.collect().foreach { case MatrixEntry(i, j, value) =>
+      mat(i.toInt, j.toInt) = value
+    }
+    mat
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala
new file mode 100644
index 0000000..13f72a3
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.Matrix
+
+/**
+ * Represents a distributively stored matrix backed by one or more RDDs.
+ */
+trait DistributedMatrix extends Serializable {
+
+  /** Gets or computes the number of rows. */
+  def numRows(): Long
+
+  /** Gets or computes the number of columns. */
+  def numCols(): Long
+
+  /** Collects data and assembles a local dense breeze matrix (for test only). */
+  private[mllib] def toBreeze(): BDM[Double]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
new file mode 100644
index 0000000..e110f07
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.SingularValueDecomposition
+
+/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */
+case class IndexedRow(index: Long, vector: Vector)
+
+/**
+ * Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
+ * indexed rows.
+ *
+ * @param rows indexed rows of this matrix
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ *              be determined by the max row index plus one.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ *              columns will be determined by the size of the first row.
+ */
+class IndexedRowMatrix(
+    val rows: RDD[IndexedRow],
+    private var nRows: Long,
+    private var nCols: Int) extends DistributedMatrix {
+
+  /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+  def this(rows: RDD[IndexedRow]) = this(rows, 0L, 0)
+
+  override def numCols(): Long = {
+    if (nCols <= 0) {
+      // Calling `first` will throw an exception if `rows` is empty.
+      nCols = rows.first().vector.size
+    }
+    nCols
+  }
+
+  override def numRows(): Long = {
+    if (nRows <= 0L) {
+      // Reduce will throw an exception if `rows` is empty.
+      nRows = rows.map(_.index).reduce(math.max) + 1L
+    }
+    nRows
+  }
+
+  /**
+   * Drops row indices and converts this matrix to a
+   * [[org.apache.spark.mllib.linalg.distributed.RowMatrix]].
+   */
+  def toRowMatrix(): RowMatrix = {
+    new RowMatrix(rows.map(_.vector), 0L, nCols)
+  }
+
+  /**
+   * Computes the singular value decomposition of this matrix.
+   * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+   *
+   * There is no restriction on m, but we require `n^2` doubles to fit in memory.
+   * Further, n should be less than m.
+
+   * The decomposition is computed by first computing A'A = V S^2 V',
+   * computing svd locally on that (since n x n is small), from which we recover S and V.
+   * Then we compute U via easy matrix multiplication as U =  A * (V * S^-1).
+   * Note that this approach requires `O(n^3)` time on the master node.
+   *
+   * At most k largest non-zero singular values and associated vectors are returned.
+   * If there are k such values, then the dimensions of the return will be:
+   *
+   * U is an [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]] of size m x k that
+   * satisfies U'U = eye(k),
+   * s is a Vector of size k, holding the singular values in descending order,
+   * and V is a local Matrix of size n x k that satisfies V'V = eye(k).
+   *
+   * @param k number of singular values to keep. We might return less than k if there are
+   *          numerically zero singular values. See rCond.
+   * @param computeU whether to compute U
+   * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
+   *              are treated as zero, where sigma(0) is the largest singular value.
+   * @return SingularValueDecomposition(U, s, V)
+   */
+  def computeSVD(
+      k: Int,
+      computeU: Boolean = false,
+      rCond: Double = 1e-9): SingularValueDecomposition[IndexedRowMatrix, Matrix] = {
+    val indices = rows.map(_.index)
+    val svd = toRowMatrix().computeSVD(k, computeU, rCond)
+    val U = if (computeU) {
+      val indexedRows = indices.zip(svd.U.rows).map { case (i, v) =>
+        IndexedRow(i, v)
+      }
+      new IndexedRowMatrix(indexedRows, nRows, nCols)
+    } else {
+      null
+    }
+    SingularValueDecomposition(U, svd.s, svd.V)
+  }
+
+  /**
+   * Multiply this matrix by a local matrix on the right.
+   *
+   * @param B a local matrix whose number of rows must match the number of columns of this matrix
+   * @return an IndexedRowMatrix representing the product, which preserves partitioning
+   */
+  def multiply(B: Matrix): IndexedRowMatrix = {
+    val mat = toRowMatrix().multiply(B)
+    val indexedRows = rows.map(_.index).zip(mat.rows).map { case (i, v) =>
+      IndexedRow(i, v)
+    }
+    new IndexedRowMatrix(indexedRows, nRows, nCols)
+  }
+
+  /**
+   * Computes the Gramian matrix `A^T A`.
+   */
+  def computeGramianMatrix(): Matrix = {
+    toRowMatrix().computeGramianMatrix()
+  }
+
+  private[mllib] override def toBreeze(): BDM[Double] = {
+    val m = numRows().toInt
+    val n = numCols().toInt
+    val mat = BDM.zeros[Double](m, n)
+    rows.collect().foreach { case IndexedRow(rowIndex, vector) =>
+      val i = rowIndex.toInt
+      vector.toBreeze.activeIterator.foreach { case (j, v) =>
+        mat(i, j) = v
+      }
+    }
+    mat
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
new file mode 100644
index 0000000..f59811f
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import java.util
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
+import breeze.numerics.{sqrt => brzSqrt}
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+
+/**
+ * Represents a row-oriented distributed Matrix with no meaningful row indices.
+ *
+ * @param rows rows stored as an RDD[Vector]
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ *              be determined by the number of records in the RDD `rows`.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ *              columns will be determined by the size of the first row.
+ */
+class RowMatrix(
+    val rows: RDD[Vector],
+    private var nRows: Long,
+    private var nCols: Int) extends DistributedMatrix with Logging {
+
+  /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+  def this(rows: RDD[Vector]) = this(rows, 0L, 0)
+
+  /** Gets or computes the number of columns. */
+  override def numCols(): Long = {
+    if (nCols <= 0) {
+      // Calling `first` will throw an exception if `rows` is empty.
+      nCols = rows.first().size
+    }
+    nCols
+  }
+
+  /** Gets or computes the number of rows. */
+  override def numRows(): Long = {
+    if (nRows <= 0L) {
+      nRows = rows.count()
+      if (nRows == 0L) {
+        sys.error("Cannot determine the number of rows because it is not specified in the " +
+          "constructor and the rows RDD is empty.")
+      }
+    }
+    nRows
+  }
+
+  /**
+   * Computes the Gramian matrix `A^T A`.
+   */
+  def computeGramianMatrix(): Matrix = {
+    val n = numCols().toInt
+    val nt: Int = n * (n + 1) / 2
+
+    // Compute the upper triangular part of the gram matrix.
+    val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))(
+      seqOp = (U, v) => {
+        RowMatrix.dspr(1.0, v, U.data)
+        U
+      },
+      combOp = (U1, U2) => U1 += U2
+    )
+
+    RowMatrix.triuToFull(n, GU.data)
+  }
+
+  /**
+   * Computes the singular value decomposition of this matrix.
+   * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+   *
+   * There is no restriction on m, but we require `n^2` doubles to fit in memory.
+   * Further, n should be less than m.
+
+   * The decomposition is computed by first computing A'A = V S^2 V',
+   * computing svd locally on that (since n x n is small), from which we recover S and V.
+   * Then we compute U via easy matrix multiplication as U =  A * (V * S^-1).
+   * Note that this approach requires `O(n^3)` time on the master node.
+   *
+   * At most k largest non-zero singular values and associated vectors are returned.
+   * If there are k such values, then the dimensions of the return will be:
+   *
+   * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
+   * s is a Vector of size k, holding the singular values in descending order,
+   * and V is a Matrix of size n x k that satisfies V'V = eye(k).
+   *
+   * @param k number of singular values to keep. We might return less than k if there are
+   *          numerically zero singular values. See rCond.
+   * @param computeU whether to compute U
+   * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
+   *              are treated as zero, where sigma(0) is the largest singular value.
+   * @return SingularValueDecomposition(U, s, V)
+   */
+  def computeSVD(
+      k: Int,
+      computeU: Boolean = false,
+      rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
+    val n = numCols().toInt
+    require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
+
+    val G = computeGramianMatrix()
+
+    // TODO: Use sparse SVD instead.
+    val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
+      brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
+    val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
+
+    // Determine effective rank.
+    val sigma0 = sigmas(0)
+    val threshold = rCond * sigma0
+    var i = 0
+    while (i < k && sigmas(i) >= threshold) {
+      i += 1
+    }
+    val sk = i
+
+    if (sk < k) {
+      logWarning(s"Requested $k singular values but only found $sk nonzeros.")
+    }
+
+    val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
+    val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+
+    if (computeU) {
+      // N = Vk * Sk^{-1}
+      val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+      var i = 0
+      var j = 0
+      while (j < sk) {
+        i = 0
+        val sigma = sigmas(j)
+        while (i < n) {
+          N(i, j) /= sigma
+          i += 1
+        }
+        j += 1
+      }
+      val U = this.multiply(Matrices.fromBreeze(N))
+      SingularValueDecomposition(U, s, V)
+    } else {
+      SingularValueDecomposition(null, s, V)
+    }
+  }
+
+  /**
+   * Computes the covariance matrix, treating each row as an observation.
+   * @return a local dense matrix of size n x n
+   */
+  def computeCovariance(): Matrix = {
+    val n = numCols().toInt
+
+    if (n > 10000) {
+      val mem = n * n * java.lang.Double.SIZE / java.lang.Byte.SIZE
+      logWarning(s"The number of columns $n is greater than 10000! " +
+        s"We need at least $mem bytes of memory.")
+    }
+
+    val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))(
+      seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze),
+      combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2)
+    )
+
+    // Update _m if it is not set, or verify its value.
+    if (nRows <= 0L) {
+      nRows = m
+    } else {
+      require(nRows == m,
+        s"The number of rows $m is different from what specified or previously computed: ${nRows}.")
+    }
+
+    mean :/= m.toDouble
+
+    // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
+    // large but Cov(X, Y) is small, but it is good for sparse computation.
+    // TODO: find a fast and stable way for sparse data.
+
+    val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+
+    var i = 0
+    var j = 0
+    val m1 = m - 1.0
+    var alpha = 0.0
+    while (i < n) {
+      alpha = m / m1 * mean(i)
+      j = 0
+      while (j < n) {
+        G(i, j) = G(i, j) / m1 - alpha * mean(j)
+        j += 1
+      }
+      i += 1
+    }
+
+    Matrices.fromBreeze(G)
+  }
+
+  /**
+   * Computes the top k principal components.
+   * Rows correspond to observations and columns correspond to variables.
+   * The principal components are stored a local matrix of size n-by-k.
+   * Each column corresponds for one principal component,
+   * and the columns are in descending order of component variance.
+   *
+   * @param k number of top principal components.
+   * @return a matrix of size n-by-k, whose columns are principal components
+   */
+  def computePrincipalComponents(k: Int): Matrix = {
+    val n = numCols().toInt
+    require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
+
+    val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]
+
+    val (u: BDM[Double], _, _) = brzSvd(Cov)
+
+    if (k == n) {
+      Matrices.dense(n, k, u.data)
+    } else {
+      Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
+    }
+  }
+
+  /**
+   * Multiply this matrix by a local matrix on the right.
+   *
+   * @param B a local matrix whose number of rows must match the number of columns of this matrix
+   * @return a [[org.apache.spark.mllib.linalg.distributed.RowMatrix]] representing the product,
+   *         which preserves partitioning
+   */
+  def multiply(B: Matrix): RowMatrix = {
+    val n = numCols().toInt
+    require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
+
+    require(B.isInstanceOf[DenseMatrix],
+      s"Only support dense matrix at this time but found ${B.getClass.getName}.")
+
+    val Bb = rows.context.broadcast(B)
+    val AB = rows.mapPartitions({ iter =>
+      val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
+      iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
+    }, preservesPartitioning = true)
+
+    new RowMatrix(AB, nRows, B.numCols)
+  }
+
+  private[mllib] override def toBreeze(): BDM[Double] = {
+    val m = numRows().toInt
+    val n = numCols().toInt
+    val mat = BDM.zeros[Double](m, n)
+    var i = 0
+    rows.collect().foreach { v =>
+      v.toBreeze.activeIterator.foreach { case (j, v) =>
+        mat(i, j) = v
+      }
+      i += 1
+    }
+    mat
+  }
+}
+
+object RowMatrix {
+
+  /**
+   * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
+   *
+   * @param U the upper triangular part of the matrix packed in an array (column major)
+   */
+  private def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = {
+    // TODO: Find a better home (breeze?) for this method.
+    val n = v.size
+    v match {
+      case dv: DenseVector =>
+        blas.dspr("U", n, 1.0, dv.values, 1, U)
+      case sv: SparseVector =>
+        val indices = sv.indices
+        val values = sv.values
+        val nnz = indices.length
+        var colStartIdx = 0
+        var prevCol = 0
+        var col = 0
+        var j = 0
+        var i = 0
+        var av = 0.0
+        while (j < nnz) {
+          col = indices(j)
+          // Skip empty columns.
+          colStartIdx += (col - prevCol) * (col + prevCol + 1) / 2
+          col = indices(j)
+          av = alpha * values(j)
+          i = 0
+          while (i <= j) {
+            U(colStartIdx + indices(i)) += av * values(i)
+            i += 1
+          }
+          j += 1
+          prevCol = col
+        }
+    }
+  }
+
+  /**
+   * Fills a full square matrix from its upper triangular part.
+   */
+  private def triuToFull(n: Int, U: Array[Double]): Matrix = {
+    val G = new BDM[Double](n, n)
+
+    var row = 0
+    var col = 0
+    var idx = 0
+    var value = 0.0
+    while (col < n) {
+      row = 0
+      while (row < col) {
+        value = U(idx)
+        G(row, col) = value
+        G(col, row) = value
+        idx += 1
+        row += 1
+      }
+      G(col, col) = U(idx)
+      idx += 1
+      col +=1
+    }
+
+    Matrices.dense(n, n, G.data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
deleted file mode 100644
index 87aac34..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.util
-
-import org.apache.spark.SparkContext._
-
-import org.apache.spark.mllib.linalg._
-
-/**
- * Helper methods for linear algebra
- */
-object LAUtils {
-  /**
-   * Convert a SparseMatrix into a TallSkinnyDenseMatrix
-   *
-   * @param sp Sparse matrix to be converted
-   * @return dense version of the input
-   */
-  def sparseToTallSkinnyDense(sp: SparseMatrix): TallSkinnyDenseMatrix = {
-    val m = sp.m
-    val n = sp.n
-    val rows = sp.data.map(x => (x.i, (x.j, x.mval))).groupByKey().map {
-      case (i, cols) =>
-        val rowArray = Array.ofDim[Double](n)
-        var j = 0
-        val colsItr = cols.iterator
-        while (colsItr.hasNext) {
-          val element = colsItr.next
-          rowArray(element._1) = element._2
-          j += 1
-        }
-        MatrixRow(i, rowArray)
-    }
-    TallSkinnyDenseMatrix(rows, m, n)
-  }
-
-  /**
-   * Convert a TallSkinnyDenseMatrix to a SparseMatrix
-   *
-   * @param a matrix to be converted
-   * @return sparse version of the input
-   */
-  def denseToSparse(a: TallSkinnyDenseMatrix): SparseMatrix = {
-    val m = a.m
-    val n = a.n
-    val data = a.rows.flatMap {
-      mrow => Array.tabulate(n)(j => MatrixEntry(mrow.i, j, mrow.data(j)))
-        .filter(x => x.mval != 0)
-    }
-    SparseMatrix(data, m, n)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
new file mode 100644
index 0000000..82d49c7
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+class BreezeMatrixConversionSuite extends FunSuite {
+  test("dense matrix to breeze") {
+    val mat = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+    val breeze = mat.toBreeze.asInstanceOf[BDM[Double]]
+    assert(breeze.rows === mat.numRows)
+    assert(breeze.cols === mat.numCols)
+    assert(breeze.data.eq(mat.asInstanceOf[DenseMatrix].values), "should not copy data")
+  }
+
+  test("dense breeze matrix to matrix") {
+    val breeze = new BDM[Double](3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+    val mat = Matrices.fromBreeze(breeze).asInstanceOf[DenseMatrix]
+    assert(mat.numRows === breeze.rows)
+    assert(mat.numCols === breeze.cols)
+    assert(mat.values.eq(breeze.data), "should not copy data")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
new file mode 100644
index 0000000..9c66b4d
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+class MatricesSuite extends FunSuite {
+  test("dense matrix construction") {
+    val m = 3
+    val n = 2
+    val values = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
+    val mat = Matrices.dense(m, n, values).asInstanceOf[DenseMatrix]
+    assert(mat.numRows === m)
+    assert(mat.numCols === n)
+    assert(mat.values.eq(values), "should not copy data")
+    assert(mat.toArray.eq(values), "toArray should not copy data")
+  }
+
+  test("dense matrix construction with wrong dimension") {
+    intercept[RuntimeException] {
+      Matrices.dense(3, 2, Array(0.0, 1.0, 2.0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
deleted file mode 100644
index 5e5086b..0000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class PCASuite extends FunSuite with BeforeAndAfterAll {
-  @transient private var sc: SparkContext = _
-
-  override def beforeAll() {
-    sc = new SparkContext("local", "test")
-  }
-
-  override def afterAll() {
-    sc.stop()
-    System.clearProperty("spark.driver.port")
-  }
-
-  val EPSILON = 1e-3
-
-  // Return jblas matrix from sparse matrix RDD
-  def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
-    val data = matrix.data
-    val ret = DoubleMatrix.zeros(matrix.m, matrix.n)
-    matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
-    ret
-  }
-
-  def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
-    assert(a.rows == b.rows && a.columns == b.columns,
-      "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
-    for (i <- 0 until a.columns) {
-      val aCol = a.getColumn(i)
-      val bCol = b.getColumn(i)
-      val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
-      assert(diff < EPSILON, "matrix mismatch: " + diff)
-    }
-  }
-
-  test("full rank matrix pca") {
-    val m = 5
-    val n = 3
-    val dataArr = Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
-    val data = sc.makeRDD(dataArr, 3) 
-    val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
-    val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
-                        (1,0,-0.1448), (1,1,0.7483),  (1,2,0.6474),
-                        (2,0,0.9553),  (2,1,-0.0649),  (2,2,0.2886))
-    val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3)
-
-    val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
-    assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)  
-  }
-
-  test("sparse matrix full rank matrix pca") {
-    val m = 5
-    val n = 3
-    // the entry that gets dropped is zero to test sparse support
-    val dataArr = Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1)
-    val data = sc.makeRDD(dataArr, 3)
-    val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
-    val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
-                        (1,0,-0.1448), (1,1,0.7483),  (1,2,0.6474),
-                        (2,0,0.9553),  (2,1,-0.0649),  (2,2,0.2886))
-    val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
-    val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
-    assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
-  }
-
-  test("truncated matrix pca") {
-    val m = 5
-    val n = 3
-    val dataArr = Array.tabulate(m,n){ (a, b) =>
-      MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
-    
-    val data = sc.makeRDD(dataArr, 3)
-    val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
-    val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602),
-                        (1,0,-0.1448), (1,1,0.7483),
-                        (2,0,0.9553),  (2,1,-0.0649))
-    val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
-    val k = 2
-    val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a))
-
-    assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs)
-  }
-}
-
-