You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/06/04 23:34:04 UTC

[2/2] git commit: MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes.

MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes.

This closes apache/mahout#6. (Re-commit).

Squashed commit of the following:

commit 32d5183543ee032770bfcebe0a5b2e22b3a388e3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Jun 4 12:31:57 2014 -0700

    removing commented-out section

commit 8e14bf0cf44a5ee86b0c137e8fbb0a8e75bf6ca8
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Tue Jun 3 13:44:01 2014 -0700

    parameter doc

commit cfa207b1bf98fd941b4d0d48baab4ac35b0deb03
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Tue Jun 3 13:42:12 2014 -0700

    +License on DQR

commit d9249968cd444ab90d92fa5197a5893f32e8c5a8
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Tue Jun 3 11:42:18 2014 -0700

    Adding regularization parameter. Refactoring decompositions and their tests into single package (for both distributed and non-distributed stuff).

commit f8ef52fe33dcf6259156ed913ac86dbad6d1a12f
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Tue Jun 3 11:05:52 2014 -0700

    flipping elementwise operator to OK in-place computation by default (back to where it were). For now. See if the problem surfaces frequent enough to switch to cloned computations.

commit dce371fbdbd17b9c1a3c79260dc8e1180e6073ee
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jun 2 18:48:33 2014 -0700

    Switching elemnetwise operators to non-inplace execution by default. This will create GC overhead but it is more benign for side effects.

commit c92a6cca9eea1c9d15c5fee9c29c79ef9ed3c77f
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jun 2 17:53:41 2014 -0700

    Fixing bug. Non-idempotent in-place elementwise computations are still an issue

commit 775490da0f28887350c8725909d4af24e8f37310
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jun 2 15:17:11 2014 -0700

    WIP; bug in zipping -- does several rounds over partitions causing unidempotant side effects.

commit 565ecb0ca2bb6b1dda30af2c81bdeb1be7fae9fe
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 30 18:11:42 2014 -0700

    stopping computation on negative convergence too (i guess in ideal data it converges too fast and rounding errors could push rmse back up.)

commit 2c9ba249e7dc312c2919a60d43e781eae67c51a9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 30 17:50:36 2014 -0700

    license

commit 4b525e5011c002bdb4eb37e27699dc1f89f63d63
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 30 17:43:31 2014 -0700

    print out iteration rmses in the test

commit 84949a533270cb2bfe735f85321706a424a950cf
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 30 17:36:00 2014 -0700

    Bug fixes, test ok

commit c47995e74dce76e74bc4a58dbd473120da407dc0
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Thu May 29 21:32:23 2014 -0700

    ALS


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9cf90546
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9cf90546
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9cf90546

Branch: refs/heads/master
Commit: 9cf90546dfa8bf11c99089a12d70b0272e7da211
Parents: 26d3db6
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Wed Jun 4 14:29:34 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Wed Jun 4 14:29:34 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../apache/mahout/math/decompositions/ALS.scala | 134 ++++++++++++
 .../apache/mahout/math/decompositions/DQR.scala |  74 +++++++
 .../mahout/math/decompositions/DSPCA.scala      | 153 +++++++++++++
 .../mahout/math/decompositions/DSSVD.scala      |  82 +++++++
 .../mahout/math/decompositions/SSVD.scala       | 165 +++++++++++++++
 .../mahout/math/decompositions/package.scala    | 127 +++++++++++
 .../mahout/math/drm/CheckpointedOps.scala       |   3 +-
 .../mahout/math/drm/DistributedEngine.scala     |  11 +-
 .../org/apache/mahout/math/drm/DrmLike.scala    |   2 +-
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |   9 +-
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |   9 +-
 .../mahout/math/drm/decompositions/DQR.scala    |  57 -----
 .../mahout/math/drm/decompositions/DSPCA.scala  | 153 -------------
 .../mahout/math/drm/decompositions/DSSVD.scala  |  83 --------
 .../apache/mahout/math/drm/logical/OpAewB.scala |   9 +-
 .../mahout/math/drm/logical/OpMapBlock.scala    |   8 +-
 .../org/apache/mahout/math/drm/package.scala    |  79 +++----
 .../mahout/math/scalabindings/MatrixOps.scala   |   2 +
 .../math/scalabindings/RLikeMatrixOps.scala     |  12 +-
 .../scalabindings/decompositions/SSVD.scala     | 165 ---------------
 .../mahout/math/scalabindings/package.scala     |  21 --
 .../decompositions/DecompositionsSuite.scala    | 113 ++++++++++
 .../mahout/math/scalabindings/MathSuite.scala   |  76 -------
 .../mahout/sparkbindings/SparkEngine.scala      |  35 +--
 .../apache/mahout/sparkbindings/blas/ABt.scala  |  35 ++-
 .../apache/mahout/sparkbindings/blas/AewB.scala | 159 ++++++++++----
 .../drm/CheckpointedDrmSpark.scala              |   8 +-
 .../mahout/sparkbindings/drm/package.scala      |  28 ++-
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/math/decompositions/MathSuite.scala  | 212 +++++++++++++++++++
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   6 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |  16 +-
 .../decompositions/MathSuite.scala              | 176 ---------------
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  37 ++++
 .../sparkbindings/test/MahoutLocalContext.scala |   2 +-
 36 files changed, 1380 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bea124a..3b2e61b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes (dlyubimov)
+
   MAHOUT-1537: Minor fixes to spark-shell (Anand Avati via dlyubimov)
 
   MAHOUT-1529: Finalize abstraction of distributed logical plans from backend operations (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
new file mode 100644
index 0000000..eaefe0e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeDrmOps._
+import RLikeOps._
+import scala.util.Random
+import org.apache.log4j.Logger
+import math._
+
+/** Simple ALS factorization algotithm. To solve, use train() method. */
+object ALS {
+
+  private val log = Logger.getLogger(ALS.getClass)
+
+  /**
+   * ALS training result. <P>
+   *
+   * <code>drmU %*% drmV.t</code> is supposed to approximate the input.
+   *
+   * @param drmU U matrix
+   * @param drmV V matrix
+   * @param iterationsRMSE RMSE values afeter each of iteration performed
+   */
+  class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
+    def toTuple = (drmU, drmV, iterationsRMSE)
+  }
+
+  /**
+   * Run ALS.
+   * <P>
+   *
+   * Example:
+   *
+   * <pre>
+   * val (u,v,errors) = als(input, k).toTuple
+   * </pre>
+   *
+   * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+   * whichever earlier.
+   * <P>
+   *
+   * @param drmInput The input matrix
+   * @param k required rank of decomposition (number of cols in U and V results)
+   * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+   *                             value. If <=0 then we won't compute RMSE and use convergence test.
+   * @param lambda regularization rate
+   * @param maxIterations maximum iterations to run regardless of convergence
+   * @tparam K row key type of the input (100 is probably more than enough)
+   * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+   */
+  def als[K: ClassTag](
+      drmInput: DrmLike[K],
+      k: Int = 50,
+      lambda: Double = 0.0,
+      maxIterations: Int = 10,
+      convergenceThreshold: Double = 0.10
+      ): Result[K] = {
+
+    assert(convergenceThreshold < 1.0, "convergenceThreshold")
+    assert(maxIterations >= 1, "maxIterations")
+
+    val drmA = drmInput
+    val drmAt = drmInput.t
+
+    // Initialize U and V so that they are identically distributed to A or A'
+    var drmU = drmA.mapBlock(ncol = k) {
+      case (keys, block) =>
+        val uBlock = Matrices.symmetricUniformView(block.nrow, k, Random.nextInt()) * 0.01
+        keys -> uBlock
+    }
+
+    var drmV: DrmLike[Int] = null
+    var rmseIterations: List[Double] = Nil
+
+    // ALS iterator
+    var stop = false
+    var i = 0
+    while (!stop && i < maxIterations) {
+
+      // Alternate. This is really what ALS is.
+      if ( drmV != null) drmV.uncache()
+      drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint()
+
+      drmU.uncache()
+      drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint()
+
+      // Check if we are requested to do a convergence test; and do it if yes.
+      if (convergenceThreshold > 0) {
+
+        val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow)
+
+        if (i > 0) {
+          val rmsePrev = rmseIterations.last
+          val convergence = (rmsePrev - rmse) / rmsePrev
+
+          if (convergence < 0) {
+            log.warn("Rmse increase of %f. Should not happen.".format(convergence))
+            // I guess error growth can happen in ideal data case?
+            stop = true
+          } else if (convergence < convergenceThreshold) {
+            stop = true
+          }
+        }
+        rmseIterations :+= rmse
+      }
+
+      i += 1
+    }
+
+    new Result(drmU, drmV, rmseIterations)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
new file mode 100644
index 0000000..4ca99b1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+  private val log = Logger.getLogger(DQR.getClass)
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    if (A.ncol > 5000)
+      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+    implicit val ctx = A.context
+
+    val AtA = (A.t %*% A).checkpoint()
+    val inCoreAtA = AtA.collect
+
+    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+    val ch = chol(inCoreAtA)
+    val inCoreR = (ch.getL cloned) t
+
+    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+    if (checkRankDeficiency && !ch.isPositiveDefinite)
+      throw new IllegalArgumentException("R is rank-deficient.")
+
+    val bcastAtA = drmBroadcast(inCoreAtA)
+
+    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+    // decompose A'A in the backend again.
+
+    // Compute Q = A*inv(L') -- we can do it blockwise.
+    val Q = A.mapBlock() {
+      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+    }
+
+    Q -> inCoreR
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
new file mode 100644
index 0000000..37c218a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+    implicit val ctx = A.context
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // Dataset mean
+    val xi = drmA.colMeans
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+    // memory beyond that is required to keep xi around, it still might be parallelized to backs
+    // for significantly big n and r. TODO
+    val s_o = omega.t %*% xi
+
+    val bcastS_o = drmBroadcast(s_o)
+    val bcastXi = drmBroadcast(xi)
+
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val s_o:Vector = bcastS_o
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+        keys -> blockY
+    }
+        // Checkpoint Y
+        .checkpoint()
+
+    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+    var s_q = drmQ.colSums()
+    var bcastVarS_q = drmBroadcast(s_q)
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+    var s_b = (drmBt.t %*% xi).collect(::, 0)
+    var bcastVarS_b = drmBroadcast(s_b)
+
+    for (i <- 0 until q) {
+
+      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
+      // create readonly closure attributes in this very scope.
+      val bcastS_q = bcastVarS_q
+      val bcastS_b = bcastVarS_b
+      val bcastXib = bcastXi
+
+      // Fix Bt as B' -= xi cross s_q
+      drmBt = drmBt.mapBlock() {
+        case (keys, block) =>
+          val s_q: Vector = bcastS_q
+          val xi: Vector = bcastXib
+          keys.zipWithIndex.foreach {
+            case (key, idx) => block(idx, ::) -= s_q * xi(key)
+          }
+          keys -> block
+      }
+
+      drmY.uncache()
+      drmQ.uncache()
+
+      drmY = (drmA %*% drmBt)
+          // Fix Y by subtracting s_b from each row of the AB'
+          .mapBlock() {
+        case (keys, block) =>
+          val s_b: Vector = bcastS_b
+          for (row <- 0 until block.nrow) block(row, ::) -= s_b
+          keys -> block
+      }
+          // Checkpoint Y
+          .checkpoint()
+
+      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+      s_q = drmQ.colSums()
+      bcastVarS_q = drmBroadcast(s_q)
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = (drmA.t %*% drmQ).checkpoint()
+
+      s_b = (drmBt.t %*% xi).collect(::, 0)
+      bcastVarS_b = drmBroadcast(s_b)
+    }
+
+    val c = s_q cross s_b
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+        c - c.t + (s_q cross s_q) * (xi dot xi)
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
new file mode 100644
index 0000000..a158390
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
@@ -0,0 +1,82 @@
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrix, Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+
+    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+    // is much more compact.
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        keys -> blockY
+    }
+
+    var drmQ = dqrThin(drmY.checkpoint())._1
+    // Checkpoint Q if last iteration
+    if (q == 0) drmQ = drmQ.checkpoint()
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = drmA.t %*% drmQ
+    // Checkpoint B' if last iteration
+    if (q == 0) drmBt = drmBt.checkpoint()
+
+    for (i <- 0  until q) {
+      drmY = drmA %*% drmBt
+      drmQ = dqrThin(drmY.checkpoint())._1
+      // Checkpoint Q if last iteration
+      if (i == q - 1) drmQ = drmQ.checkpoint()
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = drmA.t %*% drmQ
+      // Checkpoint B' if last iteration
+      if (i == q - 1) drmBt = drmBt.checkpoint()
+    }
+
+    val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+  private val log = Logger.getLogger(SSVD.getClass)
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    var y = a %*% omega
+    var yty = y.t %*% y
+    val at = a.t
+    var ch = chol(yty)
+    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+    var bt = ch.solveRight(at %*% y)
+
+    // Power iterations
+    for (i <- 0 until q) {
+      y = a %*% bt
+      yty = y.t %*% y
+      ch = chol(yty)
+      bt = ch.solveRight(at %*% y)
+    }
+
+    val bbt = bt.t %*% bt
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = ch.solveRight(y) %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+  }
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    // Dataset mean
+    val xi = a.colMeans()
+
+    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+    var y = a %*% omega
+
+    // Fixing y
+    val s_o = omega.t %*% xi
+    y := ((r,c,v) => v - s_o(c))
+
+    var yty = y.t %*% y
+    var ch = chol(yty)
+//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+    // This is implicit Q of QR(Y)
+    var qm = ch.solveRight(y)
+    var bt = a.t %*% qm
+    var s_q = qm.colSums()
+    var s_b = bt.t %*% xi
+
+    // Power iterations
+    for (i <- 0 until q) {
+
+      // Fix bt
+      bt -= xi cross s_q
+
+      y = a %*% bt
+
+      // Fix Y again.
+      y := ((r,c,v) => v - s_b(c))
+
+      yty = y.t %*% y
+      ch = chol(yty)
+      qm = ch.solveRight(y)
+      bt = a.t %*% qm
+      s_q = qm.colSums()
+      s_b = bt.t %*% xi
+    }
+
+    val c = s_q cross s_b
+
+    // BB' computation becomes
+    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = qm %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
new file mode 100644
index 0000000..852a977
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/**
+ * This package holds all decomposition and factorization-like methods, all that we were able to make
+ * distributed engine-independent so far, anyway.
+ */
+package object decompositions {
+
+  // ================ In-core decompositions ===================
+
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
+    SSVD.spca(a = a, k = k, p = p, q = q)
+
+
+  // ============== Distributed decompositions ===================
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+    DQR.dqrThin(A, checkRankDeficiency)
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+  /**
+   * Run ALS.
+   * <P>
+   *
+   * Example:
+   *
+   * <pre>
+   * val (u,v,errors) = als(input, k).toTuple
+   * </pre>
+   *
+   * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+   * whichever earlier.
+   * <P>
+   *
+   * @param drmInput The input matrix
+   * @param k required rank of decomposition (number of cols in U and V results)
+   * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+   *                             value. If <=0 then we won't compute RMSE and use convergence test.
+   * @param lambda regularization rate
+   * @param maxIterations maximum iterations to run regardless of convergence
+   * @tparam K row key type of the input (100 is probably more than enough)
+   * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+   */
+  def als[K: ClassTag](
+      drmInput: DrmLike[K],
+      k: Int = 50,
+      lambda: Double = 0.0,
+      maxIterations: Int = 10,
+      convergenceThreshold: Double = 0.10
+      ): ALS.Result[K] =
+    ALS.als(drmInput, k, lambda, maxIterations, convergenceThreshold)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index fa1ccfd..edd0cfc 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math.Vector
+import org.apache.mahout.math._
 
 
 /**
@@ -35,5 +35,6 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
   /** Column Means */
   def colMeans(): Vector = drm.context.colMeans(drm)
 
+  def norm():Double = drm.context.norm(drm)
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 0e76d87..5ffee9d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,10 +19,11 @@ package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
 import logical._
-import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
 import DistributedEngine._
 import org.apache.mahout.math.scalabindings._
-import RLikeOps._
 
 /** Abstraction of optimizer/distributed engine */
 trait DistributedEngine {
@@ -43,10 +44,12 @@ trait DistributedEngine {
   def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
 
   /** Engine-specific colSums implementation based on a checkpoint. */
-  def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+  def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
 
   /** Broadcast support */
   def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 8e0db1e..7fbfc12 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -28,7 +28,7 @@ package org.apache.mahout.math.drm
  */
 trait DrmLike[K] {
 
-  protected[mahout] def partitioningTag:Long
+  protected[mahout] def partitioningTag: Long
 
   protected[mahout] val context:DistributedContext
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 35e28af..328805a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -36,9 +36,14 @@ class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
    * @tparam R
    * @return
    */
-  def mapBlock[R : ClassTag](ncol: Int = -1)
+  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
       (bmf: BlockMapFunc[K, R]): DrmLike[R] =
-    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+    new OpMapBlock[K, R](
+      A = drm,
+      bmf = bmf,
+      _ncol = ncol,
+      identicallyPartitioned = identicallyParitioned
+    )
 
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index f46d15c..7ac5577 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,13 +25,13 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   import RLikeDrmOps._
 
-  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
 
-  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
 
-  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
 
-  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
 
   def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
 
@@ -70,7 +70,6 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
 
   def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
 
-
 }
 
 object RLikeDrmOps {

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
deleted file mode 100644
index 34ae345..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
-  private val log = Logger.getLogger(DQR.getClass)
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
-    if (A.ncol > 5000)
-      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
-    implicit val ctx = A.context
-
-    val AtA = (A.t %*% A).checkpoint()
-    val inCoreAtA = AtA.collect
-
-    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
-    val ch = chol(inCoreAtA)
-    val inCoreR = (ch.getL cloned) t
-
-    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
-    if (checkRankDeficiency && !ch.isPositiveDefinite)
-      throw new IllegalArgumentException("R is rank-deficient.")
-
-    val bcastAtA = drmBroadcast(inCoreAtA)
-
-    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
-    // decompose A'A in the backend again.
-
-    // Compute Q = A*inv(L') -- we can do it blockwise.
-    val Q = A.mapBlock() {
-      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
-    }
-
-    Q -> inCoreR
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
deleted file mode 100644
index 9e33416..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-    implicit val ctx = A.context
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // Dataset mean
-    val xi = drmA.colMeans
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
-    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
-    // memory beyond that is required to keep xi around, it still might be parallelized to backs
-    // for significantly big n and r. TODO
-    val s_o = omega.t %*% xi
-
-    val bcastS_o = drmBroadcast(s_o)
-    val bcastXi = drmBroadcast(xi)
-
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val s_o:Vector = bcastS_o
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
-        keys -> blockY
-    }
-        // Checkpoint Y
-        .checkpoint()
-
-    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-    var s_q = drmQ.colSums()
-    var bcastVarS_q = drmBroadcast(s_q)
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = (drmA.t %*% drmQ).checkpoint()
-
-    var s_b = (drmBt.t %*% xi).collect(::, 0)
-    var bcastVarS_b = drmBroadcast(s_b)
-
-    for (i <- 0 until q) {
-
-      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
-      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
-      // create readonly closure attributes in this very scope.
-      val bcastS_q = bcastVarS_q
-      val bcastS_b = bcastVarS_b
-      val bcastXib = bcastXi
-
-      // Fix Bt as B' -= xi cross s_q
-      drmBt = drmBt.mapBlock() {
-        case (keys, block) =>
-          val s_q: Vector = bcastS_q
-          val xi: Vector = bcastXib
-          keys.zipWithIndex.foreach {
-            case (key, idx) => block(idx, ::) -= s_q * xi(key)
-          }
-          keys -> block
-      }
-
-      drmY.uncache()
-      drmQ.uncache()
-
-      drmY = (drmA %*% drmBt)
-          // Fix Y by subtracting s_b from each row of the AB'
-          .mapBlock() {
-        case (keys, block) =>
-          val s_b: Vector = bcastS_b
-          for (row <- 0 until block.nrow) block(row, ::) -= s_b
-          keys -> block
-      }
-          // Checkpoint Y
-          .checkpoint()
-
-      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-      s_q = drmQ.colSums()
-      bcastVarS_q = drmBroadcast(s_q)
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = (drmA.t %*% drmQ).checkpoint()
-
-      s_b = (drmBt.t %*% xi).collect(::, 0)
-      bcastVarS_b = drmBroadcast(s_b)
-    }
-
-    val c = s_q cross s_b
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
-        c - c.t + (s_q cross s_q) * (xi dot xi)
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
deleted file mode 100644
index 0da9ec7..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
-  /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-
-    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
-    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
-    // is much more compact.
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        keys -> blockY
-    }
-
-    var drmQ = dqrThin(drmY.checkpoint())._1
-    // Checkpoint Q if last iteration
-    if (q == 0) drmQ = drmQ.checkpoint()
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = drmA.t %*% drmQ
-    // Checkpoint B' if last iteration
-    if (q == 0) drmBt = drmBt.checkpoint()
-
-    for (i <- 0  until q) {
-      drmY = drmA %*% drmBt
-      drmQ = dqrThin(drmY.checkpoint())._1
-      // Checkpoint Q if last iteration
-      if (i == q - 1) drmQ = drmQ.checkpoint()
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = drmA.t %*% drmQ
-      // Checkpoint B' if last iteration
-      if (i == q - 1) drmBt = drmBt.checkpoint()
-    }
-
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index d07172a..da7c0d5 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -19,17 +19,24 @@ package org.apache.mahout.math.drm.logical
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
 
 /** DRM elementwise operator */
 case class OpAewB[K: ClassTag](
     override var A: DrmLike[K],
     override var B: DrmLike[K],
-    val op: Char
+    val op: String
     ) extends AbstractBinaryOp[K, K, K] {
 
+
+
   assert(A.ncol == B.ncol, "arguments must have same number of columns")
   assert(A.nrow == B.nrow, "arguments must have same number of rows")
 
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.partitioningTag == B.partitioningTag) A.partitioningTag
+    else Random.nextLong()
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 8e4362d..7299d9e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -21,16 +21,18 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+import scala.util.Random
 
 class OpMapBlock[S: ClassTag, R: ClassTag](
     override var A: DrmLike[S],
     val bmf: BlockMapFunc[S, R],
     val _ncol: Int = -1,
-    val _nrow: Long = -1
+    val _nrow: Long = -1,
+    identicallyPartitioned:Boolean
     ) extends AbstractUnaryOp[S, R] {
 
-
-  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
 
   /** R-like syntax for number of rows. */
   def nrow: Long = if (_nrow >= 0) _nrow else A.nrow

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 768bb1c..02e8b7a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -18,7 +18,9 @@
 package org.apache.mahout.math
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
 
 package object drm {
 
@@ -70,56 +72,45 @@ package object drm {
       (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
 
   /** Implicit broadcast -> value conversion. */
-  implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+  implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
 
   /** Just throw all engine operations into context as well. */
-  implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+  implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
 
   implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
     new CheckpointedOps[K](drm)
 
-  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
-
-  // ============== Decompositions ===================
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
-    DQR.dqrThin(A, checkRankDeficiency)
-
   /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
+   * We assume that whenever computational action is invoked without explicit checkpoint, the user
+   * doesn't imply caching
    */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
-
+  implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
+
+  /** Implicit conversion to in-core with NONE caching of the result. */
+  implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
+
+  /** Do vertical concatenation of collection of blockified tuples */
+  def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
+    assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
+    if (blocks.size == 1) {
+      // No coalescing required.
+      blocks.head
+    } else {
+      // compute total number of rows in a new block
+      val m = blocks.view.map(_._2.nrow).sum
+      val n = blocks.head._2.ncol
+      val coalescedBlock = blocks.head._2.like(m, n)
+      val coalescedKeys = new Array[K](m)
+      var row = 0
+      for (elem <- blocks.view) {
+        val block = elem._2
+        val rowEnd = row + block.nrow
+        coalescedBlock(row until rowEnd, ::) := block
+        elem._1.copyToArray(coalescedKeys, row)
+        row = rowEnd
+      }
+      coalescedKeys -> coalescedBlock
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index a872240..48c2048 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -51,6 +51,8 @@ class MatrixOps(val m: Matrix) {
 
   def -(that: Matrix) = cloned -= that
 
+  def -:(that: Matrix) = that - m
+
   // m.plus(that)?
 
   def +(that: Double) = cloned += that

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
index 23c2ac2..af53c51 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
@@ -20,7 +20,7 @@ import org.apache.mahout.math.{Vector, Matrix}
 import scala.collection.JavaConversions._
 import RLikeOps._
 
-class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
+class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
 
   /**
    * matrix-matrix multiplication
@@ -47,12 +47,14 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
 
   def *(that: Double) = cloned *= that
 
-  def /(that:Matrix) = cloned /= that
+  def /(that: Matrix) = cloned /= that
 
-  def /(that:Double) = cloned /= that
+  def /:(that: Matrix) = that / m
+
+  def /(that: Double) = cloned /= that
 
   /** 1.0 /: A is eqivalent to R's 1.0/A */
-  def /:(that:Double) = that /=: cloned
+  def /:(that: Double) = that /=: cloned
 
   /**
    * in-place Hadamard product. We probably don't want to use assign
@@ -82,7 +84,7 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
   }
 
   /** 1.0 /=: A is equivalent to A = 1.0/A in R */
-  def /=:(that:Double) = {
+  def /=:(that: Double) = {
     m.foreach(that /=: _.vector())
     m
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
deleted file mode 100644
index 80385a3..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import org.apache.mahout.common.RandomUtils
-import org.apache.log4j.Logger
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-private[math] object SSVD {
-
-  private val log = Logger.getLogger(SSVD.getClass)
-
-  /**
-   * In-core SSVD algorithm.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    var y = a %*% omega
-    var yty = y.t %*% y
-    val at = a.t
-    var ch = chol(yty)
-    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-    var bt = ch.solveRight(at %*% y)
-
-    // Power iterations
-    for (i <- 0 until q) {
-      y = a %*% bt
-      yty = y.t %*% y
-      ch = chol(yty)
-      bt = ch.solveRight(at %*% y)
-    }
-
-    val bbt = bt.t %*% bt
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = ch.solveRight(y) %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-  }
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    // Dataset mean
-    val xi = a.colMeans()
-
-    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
-    var y = a %*% omega
-
-    // Fixing y
-    val s_o = omega.t %*% xi
-    y := ((r,c,v) => v - s_o(c))
-
-    var yty = y.t %*% y
-    var ch = chol(yty)
-//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
-    // This is implicit Q of QR(Y)
-    var qm = ch.solveRight(y)
-    var bt = a.t %*% qm
-    var s_q = qm.colSums()
-    var s_b = bt.t %*% xi
-
-    // Power iterations
-    for (i <- 0 until q) {
-
-      // Fix bt
-      bt -= xi cross s_q
-
-      y = a %*% bt
-
-      // Fix Y again.
-      y := ((r,c,v) => v - s_b(c))
-
-      yty = y.t %*% y
-      ch = chol(yty)
-      qm = ch.solveRight(y)
-      bt = a.t %*% qm
-      s_q = qm.colSums()
-      s_b = bt.t %*% xi
-    }
-
-    val c = s_q cross s_b
-
-    // BB' computation becomes
-    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
-
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = qm %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 4599146..2b7773b 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -281,26 +281,5 @@ package object scalabindings {
     x(::, 0)
   }
 
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
-    SSVD.spca(a = a, k = k, p = p, q = q)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
new file mode 100644
index 0000000..8f5ec99
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.scalatest.FunSuite
+import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+
+/**
+ * This suite tests only in-core decomposititions.
+ * <P>
+ *
+ * We moved distributed tests into mahout-spark module since they require a concrete distributed
+ * engine dependencies to run.
+ * <P>
+ */
+class DecompositionsSuite extends FunSuite with MahoutSuite {
+
+  test("ssvd") {
+
+    // Very naive, a full-rank only here.
+    val a = dense(
+      (1, 2, 3),
+      (3, 4, 5),
+      (-2, 6, 7),
+      (-3, 8, 9)
+    )
+
+    val rank = 2
+    val (u, v, s) = ssvd(a, k = rank, q = 1)
+
+    val (uControl, vControl, sControl) = svd(a)
+
+    printf("U:\n%s\n", u)
+    printf("U-control:\n%s\n", uControl)
+    printf("V:\n%s\n", v)
+    printf("V-control:\n%s\n", vControl)
+    printf("Sigma:\n%s\n", s)
+    printf("Sigma-control:\n%s\n", sControl)
+
+    (s - sControl(0 until rank)).norm(2) should be < 1E-7
+
+    // Singular vectors may be equivalent down to a sign only.
+    (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
+    (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
+  }
+
+  test("spca") {
+
+    import math._
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+    // specific additions.
+    val k = 10
+    var (pca, _, s) = spca(a = input, k = k, p = spectrumLen, q = 1)
+    printf("Svs:%s\n", s)
+    // Un-normalized pca data:
+    pca = pca %*%: diagv(s)
+
+    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+
+    printf("Svs-control:%s\n", sControl)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index d1171cc..b10cde3 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -193,82 +193,6 @@ class MathSuite extends FunSuite with MahoutSuite {
 
   }
 
-  test("ssvd") {
-    val a = dense(
-      (1, 2, 3),
-      (3, 4, 5),
-      (-2, 6, 7),
-      (-3, 8, 9)
-    )
-
-    val rank = 2
-    val (u, v, s) = ssvd(a, k = rank, q = 1)
-
-    val (uControl, vControl, sControl) = svd(a)
-
-    printf("U:\n%s\n", u)
-    printf("U-control:\n%s\n", uControl)
-    printf("V:\n%s\n", v)
-    printf("V-control:\n%s\n", vControl)
-    printf("Sigma:\n%s\n", s)
-    printf("Sigma-control:\n%s\n", sControl)
-
-    (s - sControl(0 until rank)).norm(2) should be < 1E-7
-
-    // Singular vectors may be equivalent down to a sign only.
-    (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
-    (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
-
-  }
-
-  test("spca") {
-
-    import math._
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m =  500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
-    // specific additions.
-    val k = 10
-    var (pca, _, s) = spca(a = input, k = k, p=spectrumLen, q = 1)
-    printf("Svs:%s\n",s)
-    // Un-normalized pca data:
-    pca = pca %*%: diagv(s)
-
-    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-
-    printf("Svs-control:%s\n",sControl)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::,0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
-  }
-
   test("random uniform") {
     val omega1 = Matrices.symmetricUniformView(2, 3, 1234)
     val omega2 = Matrices.symmetricUniformView(2, 3, 1234)

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 0c904ab..3a03e58 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,9 +17,9 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
 import RLikeOps._
-import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import scala.Some
 import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
 
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
@@ -51,7 +53,17 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+  override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+    if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+    drm.rdd
+        // Compute sum of squares of each vector
+        .map {
+      case (key, v) => v dot v
+    }
+        .reduce(_ + _)
+
 
   /**
    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -123,7 +135,10 @@ object SparkEngine extends DistributedEngine {
 
     {
       implicit def getWritable(x: Any): Writable = val2key()
-      new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+      new CheckpointedDrmSpark(
+        rdd = rdd.map(t => (key2val(t._1), t._2)),
+        _cacheStorageLevel = StorageLevel.MEMORY_ONLY
+      )(km.asInstanceOf[ClassTag[Any]])
     }
   }
 
@@ -210,16 +225,8 @@ object SparkEngine extends DistributedEngine {
       case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
       case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
       case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
       case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
       case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
       // Custom operators, we just execute them

http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 97873bd..1e3f286 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -65,16 +65,22 @@ object ABt {
     // Blockify everything.
     val blocksA = srcA.toBlockifiedDrmRdd()
 
-        // mark row-blocks with group id
+        // Mark row-blocks with group id
         .mapPartitionsWithIndex((part, iter) => {
-      val rowBlockId = part
-      val (blockKeys, block) = iter.next()
 
-      // Each partition must have exactly one block due to implementation guarantees of blockify()
-      iter.ensuring(!_.hasNext)
+      if (iter.isEmpty) {
+        Iterator.empty
+      } else {
+
+        val rowBlockId = part
+        val (blockKeys, block) = iter.next()
 
-      // the output is (row block id, array of row keys, and the matrix representing the block).
-      Iterator((rowBlockId, blockKeys, block))
+        // Each partition must have exactly one block due to implementation guarantees of blockify()
+        assert(!iter.hasNext, "Partition #%d is expected to have at most 1 block at AB'.".format(part))
+
+        // the output is (row block id, array of row keys, and the matrix representing the block).
+        Iterator((rowBlockId, blockKeys, block))
+      }
     })
 
     val blocksB = srcB.toBlockifiedDrmRdd()
@@ -96,7 +102,7 @@ object ABt {
 
 
     // The plan.
-    val blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+    var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
 
         // Build Cartesian. It may require a bit more memory there at tasks.
         .cartesian(blocksB)
@@ -119,7 +125,7 @@ object ABt {
         // Combine -- this is probably the most efficient
         .combineByKey[(Array[K],Matrix)](
 
-          createCombiner = (t:(Array[K],Array[Int],Matrix)) => t match {
+          createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
             case (rowKeys, colKeys, blockProd) =>
 
               // Accumulator is a row-wise block of sparse vectors.
@@ -148,11 +154,20 @@ object ABt {
           mergeCombiners = (c1: (Array[K], Matrix), c2: (Array[K], Matrix)) => {
             c1._2 += c2._2
             c1
-          })
+          },
+
+          // Cartesian will tend to produce much more partitions that we actually need for co-grouping,
+          // and as a result, we may see empty partitions than we actually need.
+          numPartitions = numProductPartitions
+        )
 
         // Combine leaves residual block key -- we don't need that.
         .map(_._2)
 
+    // This may produce more than one block per partition. Most implementation rely on convention of
+    // having at most one block per partition.
+    blockifiedRdd = rbind(blockifiedRdd)
+
     new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
   }
 }