You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/07/14 22:19:45 UTC

git commit: MAHOUT-1529 (d): moving core engine-independent tests logic to math-scala, spark module running them. Source: dlyubimov/MAHOUT-1529d This closes apache/mahout#28.

Repository: mahout
Updated Branches:
  refs/heads/master e4ba7887f -> 25a6fc096


MAHOUT-1529 (d): moving core engine-independent tests logic to math-scala, spark module running them.
Source: dlyubimov/MAHOUT-1529d
This closes apache/mahout#28.

Squashed commit of the following:

commit 8d693b02e474ca1380c0e1cfa262ec4a47afeeae
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jul 14 13:03:13 2014 -0700

    minor style

commit f568736a35a8c76c6eb70694ce9feee09ff845dc
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jul 14 12:57:19 2014 -0700

    + license

commit d47e2dca693c147e6978a6c5e1fe42c1ab73ece3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jul 14 12:55:36 2014 -0700

    + license

commit 61e6268d261c875781d430ca7861e0b345f485c3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri Jul 11 11:31:44 2014 -0700

    style

commit c2406654c3fc7326b87848a23933cd4031a2c9a9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri Jul 11 11:27:53 2014 -0700

    Initial test rearrangement. Pushing engine-independent logic to math-scala. Some renames.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/25a6fc09
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/25a6fc09
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/25a6fc09

Branch: refs/heads/master
Commit: 25a6fc0967357e6ba4aafcaf11bf3f7faec752fd
Parents: e4ba788
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Mon Jul 14 13:16:41 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Mon Jul 14 13:18:33 2014 -0700

----------------------------------------------------------------------
 .../DistributedDecompositionsSuiteBase.scala    | 219 +++++++++
 .../mahout/math/drm/DrmLikeOpsSuiteBase.scala   |  93 ++++
 .../mahout/math/drm/DrmLikeSuiteBase.scala      |  98 ++++
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  | 483 +++++++++++++++++++
 .../mahout/test/DistributedMahoutSuite.scala    |  28 ++
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   |   4 +-
 .../drivers/ItemSimilarityDriverSuite.scala     |   4 +-
 .../DistributedDecompositionsSuite.scala        |  34 ++
 .../mahout/math/decompositions/MathSuite.scala  | 212 --------
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   4 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |   4 +-
 .../mahout/sparkbindings/blas/AtASuite.scala    |   4 +-
 .../mahout/sparkbindings/blas/AtSuite.scala     |   4 +-
 .../sparkbindings/drm/DrmLikeOpsSuite.scala     |  68 +--
 .../mahout/sparkbindings/drm/DrmLikeSuite.scala |  78 +--
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    | 483 +------------------
 .../test/DistributedSparkSuite.scala            |  57 +++
 .../test/LoggerConfiguration.scala              |  17 +
 .../sparkbindings/test/MahoutLocalContext.scala |  40 --
 19 files changed, 1051 insertions(+), 883 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
new file mode 100644
index 0000000..bc2ee4b
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.common.RandomUtils
+import math._
+
+/**
+ * ==Common distributed code to run against each distributed engine support.==
+ *
+ * Each distributed engine's decompositions package should have a suite that includes this feature
+ * as part of its distributed test suite.
+ *
+ */
+trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Matchers { this:FunSuite =>
+
+
+  test("thin distributed qr") {
+
+    val inCoreA = dense(
+      (1, 2, 3, 4),
+      (2, 3, 4, 5),
+      (3, -4, 5, 6),
+      (4, 5, 6, 7),
+      (8, 6, 7, 8)
+    )
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
+
+    // Assert optimizer still knows Q and A are identically partitioned
+    drmQ.partitioningTag should equal(A.partitioningTag)
+
+//    drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
+//
+//    // Should also be zippable
+//    drmQ.rdd.zip(other = A.rdd)
+
+    val inCoreQ = drmQ.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("Q=\n%s\n", inCoreQ)
+    printf("R=\n%s\n", inCoreR)
+
+    val (qControl, rControl) = qr(inCoreA)
+    printf("qControl=\n%s\n", qControl)
+    printf("rControl=\n%s\n", rControl)
+
+    // Validate with Cholesky
+    val ch = chol(inCoreA.t %*% inCoreA)
+    printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
+    printf("L:\n%s\n", ch.getL)
+
+    val rControl2 = (ch.getL cloned).t
+    val qControl2 = ch.solveRight(inCoreA)
+    printf("qControl2=\n%s\n", qControl2)
+    printf("rControl2=\n%s\n", rControl2)
+
+    // Housholder approach seems to be a little bit more stable
+    (rControl - inCoreR).norm should be < 1E-5
+    (qControl - inCoreQ).norm should be < 1E-5
+
+    // Assert identicity with in-core Cholesky-based -- this should be tighter.
+    (rControl2 - inCoreR).norm should be < 1E-10
+    (qControl2 - inCoreQ).norm should be < 1E-10
+
+    // Assert orhtogonality:
+    // (a) Q[,j] dot Q[,j] == 1.0 for all j
+    // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
+    for (col <- 0 until inCoreQ.ncol)
+      ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
+    for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
+      (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
+
+
+  }
+
+  test("dssvd - the naive-est - q=0") {
+    dssvdNaive(q = 0)
+  }
+
+  test("ddsvd - naive - q=1") {
+    dssvdNaive(q = 1)
+  }
+
+  test("ddsvd - naive - q=2") {
+    dssvdNaive(q = 2)
+  }
+
+
+  def dssvdNaive(q: Int) {
+    val inCoreA = dense(
+      (1, 2, 3, 4),
+      (2, 3, 4, 5),
+      (3, -4, 5, 6),
+      (4, 5, 6, 7),
+      (8, 6, 7, 8)
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
+    val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
+
+    printf("U:\n%s\n", inCoreU)
+    printf("V:\n%s\n", inCoreV)
+    printf("Sigma:\n%s\n", s)
+
+    (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
+  }
+
+  test("dspca") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+    val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+    // specific additions.
+    val k = 10
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
+    // Un-normalized pca data:
+    drmPCA = drmPCA %*% diagv(s)
+
+    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+  }
+
+  test("dals") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    val n = 500
+
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    // Create singluar values with decay
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    // Create A as an ideal input
+    val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
+        qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    // Decompose using ALS
+    val (drmU, drmV, rmse) = als(drmInput = drmA, k = 20).toTuple
+    val inCoreU = drmU.collect
+    val inCoreV = drmV.collect
+
+    val predict = inCoreU %*% inCoreV.t
+
+    printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
+    printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
+
+    val err = (inCoreA - predict).norm
+    printf("norm of residuals %f\n", err)
+    printf("train iteration rmses: %s\n", rmse)
+
+    err should be < 1e-2
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
new file mode 100644
index 0000000..849db68
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+
+/** Common tests for DrmLike operators to be executed by all distributed engines. */
+trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers {
+  this: FunSuite =>
+
+  test("mapBlock") {
+
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = A.mapBlock(/* Inherit width */) {
+      case (keys, block) => keys -> (block += 1.0)
+    }
+
+    val inCoreB = B.collect
+    val inCoreBControl = inCoreA + 1.0
+
+    println(inCoreB)
+
+    // Assert they are the same
+    (inCoreB - inCoreBControl).norm should be < 1E-10
+
+  }
+
+  test("col range") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = A(::, 1 to 2)
+    val inCoreB = B.collect
+    val inCoreBControl = inCoreA(::, 1 to 2)
+
+    println(inCoreB)
+
+    // Assert they are the same
+    (inCoreB - inCoreBControl).norm should be < 1E-10
+
+  }
+
+  test("row range") {
+
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = A(1 to 2, ::)
+    val inCoreB = B.collect
+    val inCoreBControl = inCoreA(1 to 2, ::)
+
+    println(inCoreB)
+
+    // Assert they are the same
+    (inCoreB - inCoreBControl).norm should be < 1E-10
+
+  }
+
+  test("col, row range") {
+
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = A(1 to 2, 1 to 2)
+    val inCoreB = B.collect
+    val inCoreBControl = inCoreA(1 to 2, 1 to 2)
+
+    println(inCoreB)
+
+    // Assert they are the same
+    (inCoreB - inCoreBControl).norm should be < 1E-10
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
new file mode 100644
index 0000000..651c611
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+
+/** Common DRM tests to be run by all distributed engines. */
+trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
+  this: FunSuite =>
+
+  test("DRM DFS i/o (local)") {
+
+    val uploadPath = "UploadedDRM"
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+    val drmA = drmParallelize(inCoreA)
+
+    drmA.writeDRM(path = uploadPath)
+
+    println(inCoreA)
+
+    // Load back from hdfs
+    val drmB = drmFromHDFS(path = uploadPath)
+
+    // Collect back into in-core
+    val inCoreB = drmB.collect
+
+    // Print out to see what it is we collected:
+    println(inCoreB)
+
+  }
+
+  test("DRM blockify dense") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    (inCoreA - drmA.mapBlock() {
+      case (keys, block) =>
+        if (!block.isInstanceOf[DenseMatrix])
+          throw new AssertionError("Block must be dense.")
+        keys -> block
+    }).norm should be < 1e-4
+  }
+
+  test("DRM blockify sparse -> SRM") {
+
+    val inCoreA = sparse(
+      (1, 2, 3),
+      0 -> 3 :: 2 -> 5 :: Nil
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    (inCoreA - drmA.mapBlock() {
+      case (keys, block) =>
+        if (!block.isInstanceOf[SparseRowMatrix])
+          throw new AssertionError("Block must be dense.")
+        keys -> block
+    }).norm should be < 1e-4
+  }
+
+  test("DRM parallelizeEmpty") {
+
+    val drmEmpty = drmParallelizeEmpty(100, 50)
+
+    // collect back into in-core
+    val inCoreEmpty = drmEmpty.collect
+
+    //print out to see what it is we collected:
+    println(inCoreEmpty)
+    printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
+    printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
+
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
new file mode 100644
index 0000000..71dc640
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import decompositions._
+import org.apache.mahout.math.drm.logical.{OpAtB, OpAtA, OpAtx}
+
+/** Common engine tests for distributed R-like DRM operations */
+trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers {
+  this: FunSuite =>
+
+  val epsilon = 1E-5
+
+  test("A.t") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+
+    val A = drmParallelize(inCoreA)
+
+    val inCoreAt = A.t.collect
+
+    // Assert first norm of difference is less than error margin.
+    (inCoreAt - inCoreA.t).norm should be < epsilon
+
+  }
+
+  test("C = A %*% B") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+
+    // Actual
+    val inCoreCControl = inCoreA %*% inCoreB
+
+    // Distributed operation
+    val C = A %*% B
+    val inCoreC = C.collect
+    println(inCoreC)
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+
+    // We also should be able to collect via implicit checkpoint
+    val inCoreC2 = C.collect
+    println(inCoreC2)
+
+    (inCoreC2 - inCoreCControl).norm should be < 1E-10
+
+  }
+
+  test("C = A %*% B mapBlock {}") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+    // Actual
+    val inCoreCControl = inCoreA %*% inCoreB
+
+    A.colSums()
+    B.colSums()
+
+
+    val x = drmBroadcast(dvec(0, 0))
+    val x2 = drmBroadcast(dvec(0, 0))
+    // Distributed operation
+    val C = (B.t %*% A.t).t.mapBlock() {
+      case (keys, block) =>
+        for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
+        keys -> block
+    }
+
+    val inCoreC = C checkpoint CacheHint.NONE collect;
+    println(inCoreC)
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+
+    // We also should be able to collect via implicit checkpoint
+    val inCoreC2 = C.collect
+    println(inCoreC2)
+
+    (inCoreC2 - inCoreCControl).norm should be < 1E-10
+
+    val inCoreQ = dqrThin(C)._1.collect
+
+    printf("Q=\n%s\n", inCoreQ)
+
+    // Assert unit-orthogonality
+    ((inCoreQ(::, 0) dot inCoreQ(::, 0)) - 1.0).abs should be < 1e-10
+    (inCoreQ(::, 0) dot inCoreQ(::, 1)).abs should be < 1e-10
+
+  }
+
+  test("C = A %*% B incompatible B keys") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+        // Re-key B into DrmLike[String] instead of [Int]
+        .mapBlock()({
+      case (keys, block) => keys.map(_.toString) -> block
+    })
+
+    val C = A %*% B
+
+    intercept[IllegalArgumentException] {
+      // This plan must not compile
+      C.checkpoint()
+    }
+  }
+
+  test("Spark-specific C = At %*% B , join") {
+
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+    val inCoreB = dense((3, 5), (4, 6), (0, 1))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+
+    val C = A.t %*% B
+
+    mahoutCtx.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
+
+    val inCoreC = C.collect
+    val inCoreControlC = inCoreA.t %*% inCoreB
+
+    (inCoreC - inCoreControlC).norm should be < 1E-10
+
+  }
+
+
+  test("C = At %*% B , join, String-keyed") {
+
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+    val inCoreB = dense((3, 5), (4, 6), (0, 1))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+        .mapBlock()({
+      case (keys, block) => keys.map(_.toString) -> block
+    })
+
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+        .mapBlock()({
+      case (keys, block) => keys.map(_.toString) -> block
+    })
+
+    val C = A.t %*% B
+
+    mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B))
+
+    val inCoreC = C.collect
+    val inCoreControlC = inCoreA.t %*% inCoreB
+
+    (inCoreC - inCoreControlC).norm should be < 1E-10
+
+  }
+
+  test("C = At %*% B , zippable, String-keyed") {
+
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+        .mapBlock()({
+      case (keys, block) => keys.map(_.toString) -> block
+    })
+
+    val B = A + 1.0
+
+    val C = A.t %*% B
+
+    mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B))
+
+    val inCoreC = C.collect
+    val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
+
+    (inCoreC - inCoreControlC).norm should be < 1E-10
+
+  }
+
+  test("C = A %*% inCoreB") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val C = A %*% inCoreB
+
+    val inCoreC = C.collect
+    val inCoreCControl = inCoreA %*% inCoreB
+
+    println(inCoreC)
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+
+  }
+
+  test("C = inCoreA %*%: B") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+    val C = inCoreA %*%: B
+
+    val inCoreC = C.collect
+    val inCoreCControl = inCoreA %*% inCoreB
+
+    println(inCoreC)
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+
+  }
+
+  test("C = A.t %*% A") {
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    val AtA = A.t %*% A
+
+    // Assert optimizer detects square
+    mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+    val inCoreAtA = AtA.collect
+    val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+  }
+
+  test("C = A.t %*% A fat non-graph") {
+    // Hack the max in-mem size for this test
+    System.setProperty("mahout.math.AtA.maxInMemNCol", "540")
+
+    val inCoreA = Matrices.uniformView(400, 550, 1234)
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    val AtA = A.t %*% A
+
+    // Assert optimizer detects square
+    mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+    val inCoreAtA = AtA.collect
+    val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+  }
+
+  test("C = A.t %*% A non-int key") {
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+    val AintKeyd = drmParallelize(m = inCoreA, numPartitions = 2)
+    val A = AintKeyd.mapBlock() {
+      case (keys, block) => keys.map(_.toString) -> block
+    }
+
+    val AtA = A.t %*% A
+
+    // Assert optimizer detects square
+    mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+    val inCoreAtA = AtA.collect
+    val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+  }
+
+  test("C = A + B") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+
+    val C = A + B
+    val inCoreC = C.collect
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+  }
+
+  test("C = A + B, identically partitioned") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+
+//    printf("A.nrow=%d.\n", A.rdd.count())
+
+    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+    val B = A.mapBlock() {
+      case (keys, block) =>
+        val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+        keys -> bBlock
+    }
+        // Prevent repeated computation non-determinism
+        .checkpoint()
+
+    val inCoreB = B.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("B=\n%s\n", inCoreB)
+
+    val C = A + B
+
+    val inCoreC = C.collect
+
+    printf("C=\n%s\n", inCoreC)
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+  }
+
+
+  test("C = A + B side test 1") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+
+    val C = A + B
+    val inCoreC = C.collect
+
+    val inCoreD = (A + B).collect
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+    (inCoreD - inCoreCControl).norm should be < 1E-10
+  }
+
+  test("C = A + B side test 2") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+
+    val C = A + B
+    val inCoreC = C.collect
+
+    val inCoreD = (A + B).collect
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+    (inCoreD - inCoreCControl).norm should be < 1E-10
+  }
+
+  test("C = A + B side test 3") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+
+    val B = drmParallelize(inCoreB, numPartitions = 2)
+    //    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
+    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
+
+    val C = A + B
+    val inCoreC = C.collect
+
+    val inCoreD = (A + B).collect
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB * 2.0
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+    (inCoreD - inCoreCControl).norm should be < 1E-10
+  }
+
+  test("Ax") {
+    val inCoreA = dense(
+      (1, 2),
+      (3, 4),
+      (20, 30)
+    )
+    val x = dvec(10, 3)
+
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    val ax = (drmA %*% x).collect(::, 0)
+
+    ax should equal(inCoreA %*% x)
+  }
+
+  test("A'x") {
+    val inCoreA = dense(
+      (1, 2),
+      (3, 4),
+      (20, 30)
+    )
+    val x = dvec(10, 3, 4)
+
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    mahoutCtx.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
+
+    val atx = (drmA.t %*% x).collect(::, 0)
+
+    atx should equal(inCoreA.t %*% x)
+  }
+
+  test("colSums, colMeans") {
+    val inCoreA = dense(
+      (1, 2),
+      (3, 4),
+      (20, 30)
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    drmA.colSums() should equal(inCoreA.colSums())
+    drmA.colMeans() should equal(inCoreA.colMeans())
+  }
+
+  test("numNonZeroElementsPerColumn") {
+    val inCoreA = dense(
+      (0, 2),
+      (3, 0),
+      (0, -30)
+
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
+  }
+
+  test("C = A cbind B, cogroup") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+    val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+    (A.cbind(B) -: controlC).norm should be < 1e-10
+
+  }
+
+  test("C = A cbind B, zip") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+    (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala b/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
new file mode 100644
index 0000000..3538991
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.test
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.scalatest.{Suite, FunSuite, Matchers}
+
+/**
+ * Unit tests that use a distributed context to run
+ */
+trait DistributedMahoutSuite extends MahoutSuite  { this: Suite =>
+  protected implicit var mahoutCtx: DistributedContext
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 065f2f8..938dc33 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -19,7 +19,7 @@ package org.apache.mahout.cf
 
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings.{MatrixOps, _}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.test.MahoutSuite
 import org.scalatest.FunSuite
 
@@ -37,7 +37,7 @@ B =
 1	1	0	1	0
  */
 
-class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
 
   // correct cooccurrence with LLR
   final val matrixLLRCoocAtAControl = dense(

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 2db830c..ca92fcf 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings._
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.test.MahoutSuite
 
-class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLocalContext  {
+class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite  {
 
 /*
   // correct self-cooccurrence with LLR

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
new file mode 100644
index 0000000..0a0c1af
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.RandomUtils
+import scala.math._
+import org.scalatest.{Matchers, FunSuite}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+class DistributedDecompositionsSuite extends FunSuite with DistributedSparkSuite with DistributedDecompositionsSuiteBase {
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
deleted file mode 100644
index 03c7190..0000000
--- a/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.math._
-import drm._
-import scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.common.RandomUtils
-import scala.math._
-import org.scalatest.{Matchers, FunSuite}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-
-class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
-
-  test("thin distributed qr") {
-
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
-
-    // Assert optimizer still knows Q and A are identically partitioned
-    drmQ.partitioningTag should equal(A.partitioningTag)
-
-    drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
-
-    // Should also be zippable
-    drmQ.rdd.zip(other = A.rdd)
-
-    val inCoreQ = drmQ.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("Q=\n%s\n", inCoreQ)
-    printf("R=\n%s\n", inCoreR)
-
-    val (qControl, rControl) = qr(inCoreA)
-    printf("qControl=\n%s\n", qControl)
-    printf("rControl=\n%s\n", rControl)
-
-    // Validate with Cholesky
-    val ch = chol(inCoreA.t %*% inCoreA)
-    printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
-    printf("L:\n%s\n", ch.getL)
-
-    val rControl2 = (ch.getL cloned).t
-    val qControl2 = ch.solveRight(inCoreA)
-    printf("qControl2=\n%s\n", qControl2)
-    printf("rControl2=\n%s\n", rControl2)
-
-    // Housholder approach seems to be a little bit more stable
-    (rControl - inCoreR).norm should be < 1E-5
-    (qControl - inCoreQ).norm should be < 1E-5
-
-    // Assert identicity with in-core Cholesky-based -- this should be tighter.
-    (rControl2 - inCoreR).norm should be < 1E-10
-    (qControl2 - inCoreQ).norm should be < 1E-10
-
-    // Assert orhtogonality:
-    // (a) Q[,j] dot Q[,j] == 1.0 for all j
-    // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
-    for (col <- 0 until inCoreQ.ncol)
-      ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
-    for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
-      (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
-
-
-  }
-
-  test("dssvd - the naive-est - q=0") {
-    dssvdNaive(q = 0)
-  }
-
-  test("ddsvd - naive - q=1") {
-    dssvdNaive(q = 1)
-  }
-
-  test("ddsvd - naive - q=2") {
-    dssvdNaive(q = 2)
-  }
-
-
-  def dssvdNaive(q: Int) {
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
-    val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
-
-    printf("U:\n%s\n", inCoreU)
-    printf("V:\n%s\n", inCoreV)
-    printf("Sigma:\n%s\n", s)
-
-    (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
-  }
-
-  test("dspca") {
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m = 500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-    val drmInput = drmParallelize(m = input, numPartitions = 2)
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
-    // specific additions.
-    val k = 10
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
-    // Un-normalized pca data:
-    drmPCA = drmPCA %*% diagv(s)
-
-    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
-    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
-  }
-
-  test("dals") {
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m = 500
-    val n = 500
-
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    // Create singluar values with decay
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    // Create A as an ideal input
-    val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
-        qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    // Decompose using ALS
-    val (drmU, drmV, rmse) = als(drmInput = drmA, k = 20).toTuple
-    val inCoreU = drmU.collect
-    val inCoreV = drmV.collect
-
-    val predict = inCoreU %*% inCoreV.t
-
-    printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
-    printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
-
-    val err = (inCoreA - predict).norm
-    printf("norm of residuals %f\n", err)
-    printf("train iteration rmses: %s\n", rmse)
-
-    err should be < 1e-2
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 52e2b35..69dbcbf 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.scalatest.FunSuite
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.drm._
@@ -28,7 +28,7 @@ import org.apache.spark.SparkContext._
 import org.apache.mahout.math.drm.logical.OpABt
 
 /** Tests for AB' operator algorithms */
-class ABtSuite extends FunSuite with MahoutLocalContext {
+class ABtSuite extends FunSuite with DistributedSparkSuite {
 
   test("ABt") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 389ef65..661e2fe 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
@@ -28,7 +28,7 @@ import org.apache.mahout.math.drm.logical.OpAewB
 import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 
 /** Elementwise matrix operation tests */
-class AewBSuite extends FunSuite with MahoutLocalContext {
+class AewBSuite extends FunSuite with DistributedSparkSuite {
 
   test("A * B Hadamard") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
index 8734b70..49b3f46 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext._
 import org.apache.mahout.math.drm.logical.OpAtA
 
 /** Tests for {@link XtX} */
-class AtASuite extends FunSuite with MahoutLocalContext {
+class AtASuite extends FunSuite with DistributedSparkSuite {
 
   test("AtA slim") {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
index a53501d..0123b78 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
@@ -26,7 +26,7 @@ import org.apache.mahout.math.drm.logical.OpAt
 import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 
 /** Tests for A' algorithms */
-class AtSuite extends FunSuite with MahoutLocalContext {
+class AtSuite extends FunSuite with DistributedSparkSuite {
 
   test("At") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
index 81ffccf..42026ae 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
@@ -1,5 +1,3 @@
-package org.apache.mahout.sparkbindings.drm
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,6 +15,7 @@ package org.apache.mahout.sparkbindings.drm
  * limitations under the License.
  */
 
+package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math._
 import scalabindings._
@@ -25,72 +24,11 @@ import RLikeOps._
 import RLikeDrmOps._
 import org.apache.mahout.sparkbindings._
 import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 
 /** Tests for DrmLikeOps */
-class DrmLikeOpsSuite extends FunSuite with MahoutLocalContext {
-
-  test("mapBlock") {
-
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = A.mapBlock(/* Inherit width */) {
-      case (keys, block) => keys -> (block += 1.0)
-    }
-
-    val inCoreB = B.collect
-    val inCoreBControl = inCoreA + 1.0
-
-    println(inCoreB)
-
-    // Assert they are the same
-    (inCoreB - inCoreBControl).norm should be < 1E-10
-
-  }
-
-  test("col range") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = A(::, 1 to 2)
-    val inCoreB = B.collect
-    val inCoreBControl = inCoreA(::, 1 to 2)
-
-    println(inCoreB)
-
-    // Assert they are the same
-    (inCoreB - inCoreBControl).norm should be < 1E-10
+class DrmLikeOpsSuite extends FunSuite with DistributedSparkSuite with DrmLikeOpsSuiteBase {
 
-  }
-
-  test("row range") {
-
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = A(1 to 2, ::)
-    val inCoreB = B.collect
-    val inCoreBControl = inCoreA(1 to 2, ::)
-
-    println(inCoreB)
-
-    // Assert they are the same
-    (inCoreB - inCoreBControl).norm should be < 1E-10
-
-  }
-
-  test("col, row range") {
-
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = A(1 to 2, 1 to 2)
-    val inCoreB = B.collect
-    val inCoreBControl = inCoreA(1 to 2, 1 to 2)
-
-    println(inCoreB)
-
-    // Assert they are the same
-    (inCoreB - inCoreBControl).norm should be < 1E-10
-
-  }
 
   test("exact, min and auto ||") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index 3c7e7f9..e6a9055 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -23,80 +23,8 @@ import scalabindings._
 import drm._
 import RLikeOps._
 import RLikeDrmOps._
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 
 
-/**
- * DRMLike tests
- */
-class DrmLikeSuite extends FunSuite with MahoutLocalContext {
-
-
-  test("DRM DFS i/o (local)") {
-
-    val uploadPath = "UploadedDRM"
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5))
-    val drmA = drmParallelize(inCoreA)
-
-    drmA.writeDRM(path = uploadPath)
-
-    println(inCoreA)
-
-    // Load back from hdfs
-    val drmB = drmFromHDFS(path = uploadPath)
-
-    // Collect back into in-core
-    val inCoreB = drmB.collect
-
-    // Print out to see what it is we collected:
-    println(inCoreB)
-
-  }
-  
-  test("DRM blockify dense") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5))
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    (inCoreA - drmA.mapBlock() {
-      case (keys, block) =>
-        if (!block.isInstanceOf[DenseMatrix])
-          throw new AssertionError("Block must be dense.")
-        keys -> block
-    }).norm should be < 1e-4
-  }
-
-  test("DRM blockify sparse -> SRM") {
-
-    val inCoreA = sparse(
-      (1, 2, 3),
-      0 -> 3 :: 2 -> 5 :: Nil
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    (inCoreA - drmA.mapBlock() {
-      case (keys, block) =>
-        if (!block.isInstanceOf[SparseRowMatrix])
-          throw new AssertionError("Block must be dense.")
-        keys -> block
-    }).norm should be < 1e-4
-  }
-
-  test("DRM parallelizeEmpty") {
-
-    val drmEmpty = drmParallelizeEmpty(100, 50)
-
-    // collect back into in-core
-    val inCoreEmpty = drmEmpty.collect
-
-    //print out to see what it is we collected:
-    println(inCoreEmpty)
-    printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
-    printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
-
-
-  }
-
-
-}
+/** DRMLike tests -- just run common DRM tests in Spark. */
+class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuiteBase

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 50f8978..b15c72c 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -17,486 +17,11 @@
 
 package org.apache.mahout.sparkbindings.drm
 
-import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.FunSuite
 import org.apache.mahout.math._
-import decompositions._
-import scalabindings._
 import drm._
-import RLikeOps._
-import RLikeDrmOps._
 import org.apache.mahout.sparkbindings._
-import test.MahoutLocalContext
-import scala.collection.mutable.ArrayBuffer
-import org.apache.mahout.math.Matrices
-import org.apache.mahout.sparkbindings.{SparkEngine, blas}
-import org.apache.spark.storage.StorageLevel
-import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
-import scala.util.Random
+import test.DistributedSparkSuite
 
-/** R-like DRM DSL operation tests */
-class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
-
-  import RLikeOps._
-
-  val epsilon = 1E-5
-
-  test("A.t") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5))
-
-    val A = drmParallelize(inCoreA)
-
-    val inCoreAt = A.t.collect
-
-    // Assert first norm of difference is less than error margin.
-    (inCoreAt - inCoreA.t).norm should be < epsilon
-
-  }
-
-  test("C = A %*% B") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-
-    // Actual
-    val inCoreCControl = inCoreA %*% inCoreB
-
-    // Distributed operation
-    val C = A %*% B
-    val inCoreC = C.collect
-    println(inCoreC)
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-
-    // We also should be able to collect via implicit checkpoint
-    val inCoreC2 = C.collect
-    println(inCoreC2)
-
-    (inCoreC2 - inCoreCControl).norm should be < 1E-10
-
-  }
-
-  test("C = A %*% B mapBlock {}") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
-    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
-
-    // Actual
-    val inCoreCControl = inCoreA %*% inCoreB
-
-    A.colSums()
-    B.colSums()
-
-
-    val x = drmBroadcast(dvec(0, 0))
-    val x2 = drmBroadcast(dvec(0, 0))
-    // Distributed operation
-    val C = (B.t %*% A.t).t.mapBlock() {
-      case (keys, block) =>
-        for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
-        keys -> block
-    }
-
-    val inCoreC = C checkpoint CacheHint.NONE collect;
-    println(inCoreC)
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-
-    // We also should be able to collect via implicit checkpoint
-    val inCoreC2 = C.collect
-    println(inCoreC2)
-
-    (inCoreC2 - inCoreCControl).norm should be < 1E-10
-
-    val inCoreQ = dqrThin(C)._1.collect
-
-    printf("Q=\n%s\n", inCoreQ)
-
-    // Assert unit-orthogonality
-    ((inCoreQ(::, 0) dot inCoreQ(::, 0)) - 1.0).abs should be < 1e-10
-    (inCoreQ(::, 0) dot inCoreQ(::, 1)).abs should be < 1e-10
-
-  }
-
-  test("C = A %*% B incompatible B keys") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-        // Re-key B into DrmLike[String] instead of [Int]
-        .mapBlock()({
-      case (keys, block) => keys.map(_.toString) -> block
-    })
-
-    val C = A %*% B
-
-    intercept[IllegalArgumentException] {
-      // This plan must not compile
-      C.checkpoint()
-    }
-  }
-
-  test("C = At %*% B , join") {
-
-    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
-    val inCoreB = dense((3, 5), (4, 6), (0, 1))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-
-    val C = A.t %*% B
-
-    SparkEngine.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
-
-    val inCoreC = C.collect
-    val inCoreControlC = inCoreA.t %*% inCoreB
-
-    (inCoreC - inCoreControlC).norm should be < 1E-10
-
-  }
-
-  test("C = At %*% B , join, String-keyed") {
-
-    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
-    val inCoreB = dense((3, 5), (4, 6), (0, 1))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-        .mapBlock()({
-      case (keys, block) => keys.map(_.toString) -> block
-    })
-
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-        .mapBlock()({
-      case (keys, block) => keys.map(_.toString) -> block
-    })
-
-    val C = A.t %*% B
-
-    SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
-
-    val inCoreC = C.collect
-    val inCoreControlC = inCoreA.t %*% inCoreB
-
-    (inCoreC - inCoreControlC).norm should be < 1E-10
-
-  }
-
-  test("C = At %*% B , zippable, String-keyed") {
-
-    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-        .mapBlock()({
-      case (keys, block) => keys.map(_.toString) -> block
-    })
-
-    val B = A + 1.0
-
-    val C = A.t %*% B
-
-    SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
-
-    val inCoreC = C.collect
-    val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
-
-    (inCoreC - inCoreControlC).norm should be < 1E-10
-
-  }
-
-  test("C = A %*% inCoreB") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
-    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val C = A %*% inCoreB
-
-    val inCoreC = C.collect
-    val inCoreCControl = inCoreA %*% inCoreB
-
-    println(inCoreC)
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-
-  }
-
-  test("C = inCoreA %*%: B") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
-    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
-
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-    val C = inCoreA %*%: B
-
-    val inCoreC = C.collect
-    val inCoreCControl = inCoreA %*% inCoreB
-
-    println(inCoreC)
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-
-  }
-
-  test("C = A.t %*% A") {
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val AtA = A.t %*% A
-
-    // Assert optimizer detects square
-    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
-    val inCoreAtA = AtA.collect
-    val inCoreAtAControl = inCoreA.t %*% inCoreA
-
-    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
-  }
-
-  test("C = A.t %*% A fat non-graph") {
-    // Hack the max in-mem size for this test
-    System.setProperty(blas.AtA.PROPERTY_ATA_MAXINMEMNCOL, "540")
-
-    val inCoreA = Matrices.uniformView(400, 550, 1234)
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val AtA = A.t %*% A
-
-    // Assert optimizer detects square
-    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
-    val inCoreAtA = AtA.collect
-    val inCoreAtAControl = inCoreA.t %*% inCoreA
-
-    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
-    log.debug("test done.")
-  }
-
-
-  test("C = A.t %*% A non-int key") {
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
-    val AintKeyd = drmParallelize(m = inCoreA, numPartitions = 2)
-    val A = AintKeyd.mapBlock() {
-      case (keys, block) => keys.map(_.toString) -> block
-    }
-
-    val AtA = A.t %*% A
-
-    // Assert optimizer detects square
-    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
-    val inCoreAtA = AtA.collect
-    val inCoreAtAControl = inCoreA.t %*% inCoreA
-
-    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
-  }
-
-  test("C = A + B") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-
-    val C = A + B
-    val inCoreC = C.collect
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-  }
-
-  test("C = A + B, identically partitioned") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-
-    printf("A.nrow=%d.\n", A.rdd.count())
-
-    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
-    val B = A.mapBlock() {
-      case (keys, block) =>
-        val bBlock = block.like() := ((r, c, v) => util.Random.nextDouble())
-        keys -> bBlock
-    }
-        // Prevent repeated computation non-determinism
-        .checkpoint()
-
-    val inCoreB = B.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("B=\n%s\n", inCoreB)
-
-    val C = A + B
-
-    val inCoreC = C.collect
-
-    printf("C=\n%s\n", inCoreC)
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-  }
-
-
-  test("C = A + B side test 1") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-
-    val C = A + B
-    val inCoreC = C.collect
-
-    val inCoreD = (A + B).collect
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-    (inCoreD - inCoreCControl).norm should be < 1E-10
-  }
-
-  test("C = A + B side test 2") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-
-    val C = A + B
-    val inCoreC = C.collect
-
-    val inCoreD = (A + B).collect
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-    (inCoreD - inCoreCControl).norm should be < 1E-10
-  }
-
-  test("C = A + B side test 3") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-
-    val B = drmParallelize(inCoreB, numPartitions = 2)
-    //    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
-    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
-
-    val C = A + B
-    val inCoreC = C.collect
-
-    val inCoreD = (A + B).collect
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB * 2.0
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-    (inCoreD - inCoreCControl).norm should be < 1E-10
-  }
-
-  test("general side") {
-    val sc = implicitly[DistributedContext]
-    val k1 = sc.parallelize(Seq(ArrayBuffer(0, 1, 2, 3)))
-        //      .persist(StorageLevel.MEMORY_ONLY)   // -- this will demonstrate immutability side effect!
-        .persist(StorageLevel.MEMORY_ONLY_SER)
-
-    println(k1.map(_ += 4).collect.head)
-    println(k1.map(_ += 4).collect.head)
-  }
-
-  test("Ax") {
-    val inCoreA = dense(
-      (1, 2),
-      (3, 4),
-      (20, 30)
-    )
-    val x = dvec(10, 3)
-
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    val ax = (drmA %*% x).collect(::, 0)
-
-    ax should equal(inCoreA %*% x)
-  }
-
-  test("A'x") {
-    val inCoreA = dense(
-      (1, 2),
-      (3, 4),
-      (20, 30)
-    )
-    val x = dvec(10, 3, 4)
-
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    SparkEngine.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
-
-    val atx = (drmA.t %*% x).collect(::, 0)
-
-    atx should equal(inCoreA.t %*% x)
-  }
-
-  test("colSums, colMeans") {
-    val inCoreA = dense(
-      (1, 2),
-      (3, 4),
-      (20, 30)
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    drmA.colSums() should equal(inCoreA.colSums())
-    drmA.colMeans() should equal(inCoreA.colMeans())
-  }
-
-  test("numNonZeroElementsPerColumn") {
-    val inCoreA = dense(
-      (0, 2),
-      (3, 0),
-      (0, -30)
-
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
-  }
-
-  test("C = A cbind B, cogroup") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val inCoreB = dense((3, 5), (4, 6))
-    val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
-    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
-
-    (A.cbind(B) -: controlC).norm should be < 1e-10
-
-  }
-
-  test("C = A cbind B, zip") {
-
-    val inCoreA = dense((1, 2), (3, 4))
-    val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
-
-    (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
-
-  }
-
-}
+/** ==R-like DRM DSL operation tests -- Spark== */
+class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
new file mode 100644
index 0000000..a0136e0
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.test
+
+import org.scalatest.Suite
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite}
+import org.apache.mahout.math.drm.DistributedContext
+
+trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfiguration {
+  this: Suite =>
+
+  protected implicit var mahoutCtx: DistributedContext = _
+  protected var masterUrl = null.asInstanceOf[String]
+
+  override protected def beforeEach() {
+    super.beforeEach()
+
+    masterUrl = "local[2]"
+    mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
+      appName = "MahoutLocalContext",
+      // Do not run MAHOUT_HOME jars in unit tests.
+      addMahoutJars = false,
+      sparkConf = new SparkConf()
+          .set("spark.kryoserializer.buffer.mb", "15")
+          .set("spark.akka.frameSize", "30")
+          .set("spark.default.parallelism", "10")
+    )
+  }
+
+  override protected def afterEach() {
+    if (mahoutCtx != null) {
+      try {
+        mahoutCtx.close()
+      } finally {
+        mahoutCtx = null
+      }
+    }
+    super.afterEach()
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
index a80f42b..d5d16a8 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.mahout.sparkbindings.test
 
 import org.scalatest.Suite

http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
deleted file mode 100644
index fb97f68..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.mahout.sparkbindings.test
-
-import org.scalatest.Suite
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.test.MahoutSuite
-import org.apache.mahout.math.drm.DistributedContext
-
-trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
-  this: Suite =>
-
-  protected implicit var mahoutCtx: DistributedContext = _
-  protected var masterUrl = null.asInstanceOf[String]
-
-  override protected def beforeEach() {
-    super.beforeEach()
-
-    masterUrl = "local[2]"
-    mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
-      appName = "MahoutLocalContext",
-      // Do not run MAHOUT_HOME jars in unit tests.
-      addMahoutJars = false,
-      sparkConf = new SparkConf()
-          .set("spark.kryoserializer.buffer.mb", "15")
-          .set("spark.akka.frameSize", "30")
-          .set("spark.default.parallelism", "10")
-    )
-  }
-
-  override protected def afterEach() {
-    if (mahoutCtx != null) {
-      try {
-        mahoutCtx.close()
-      } finally {
-        mahoutCtx = null
-      }
-    }
-    super.afterEach()
-  }
-}