You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/07/14 22:19:45 UTC
git commit: MAHOUT-1529 (d): moving core engine-independent tests
logic to math-scala,
spark module running them. Source: dlyubimov/MAHOUT-1529d This closes
apache/mahout#28.
Repository: mahout
Updated Branches:
refs/heads/master e4ba7887f -> 25a6fc096
MAHOUT-1529 (d): moving core engine-independent tests logic to math-scala, spark module running them.
Source: dlyubimov/MAHOUT-1529d
This closes apache/mahout#28.
Squashed commit of the following:
commit 8d693b02e474ca1380c0e1cfa262ec4a47afeeae
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jul 14 13:03:13 2014 -0700
minor style
commit f568736a35a8c76c6eb70694ce9feee09ff845dc
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jul 14 12:57:19 2014 -0700
+ license
commit d47e2dca693c147e6978a6c5e1fe42c1ab73ece3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jul 14 12:55:36 2014 -0700
+ license
commit 61e6268d261c875781d430ca7861e0b345f485c3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri Jul 11 11:31:44 2014 -0700
style
commit c2406654c3fc7326b87848a23933cd4031a2c9a9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri Jul 11 11:27:53 2014 -0700
Initial test rearrangement. Pushing engine-independent logic to math-scala. Some renames.
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/25a6fc09
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/25a6fc09
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/25a6fc09
Branch: refs/heads/master
Commit: 25a6fc0967357e6ba4aafcaf11bf3f7faec752fd
Parents: e4ba788
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Mon Jul 14 13:16:41 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Mon Jul 14 13:18:33 2014 -0700
----------------------------------------------------------------------
.../DistributedDecompositionsSuiteBase.scala | 219 +++++++++
.../mahout/math/drm/DrmLikeOpsSuiteBase.scala | 93 ++++
.../mahout/math/drm/DrmLikeSuiteBase.scala | 98 ++++
.../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 483 +++++++++++++++++++
.../mahout/test/DistributedMahoutSuite.scala | 28 ++
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 4 +-
.../drivers/ItemSimilarityDriverSuite.scala | 4 +-
.../DistributedDecompositionsSuite.scala | 34 ++
.../mahout/math/decompositions/MathSuite.scala | 212 --------
.../mahout/sparkbindings/blas/ABtSuite.scala | 4 +-
.../mahout/sparkbindings/blas/AewBSuite.scala | 4 +-
.../mahout/sparkbindings/blas/AtASuite.scala | 4 +-
.../mahout/sparkbindings/blas/AtSuite.scala | 4 +-
.../sparkbindings/drm/DrmLikeOpsSuite.scala | 68 +--
.../mahout/sparkbindings/drm/DrmLikeSuite.scala | 78 +--
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 483 +------------------
.../test/DistributedSparkSuite.scala | 57 +++
.../test/LoggerConfiguration.scala | 17 +
.../sparkbindings/test/MahoutLocalContext.scala | 40 --
19 files changed, 1051 insertions(+), 883 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
new file mode 100644
index 0000000..bc2ee4b
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.common.RandomUtils
+import math._
+
+/**
+ * ==Common distributed code to run against each distributed engine support.==
+ *
+ * Each distributed engine's decompositions package should have a suite that includes this feature
+ * as part of its distributed test suite.
+ *
+ */
+trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Matchers { this:FunSuite =>
+
+
+ test("thin distributed qr") {
+
+ val inCoreA = dense(
+ (1, 2, 3, 4),
+ (2, 3, 4, 5),
+ (3, -4, 5, 6),
+ (4, 5, 6, 7),
+ (8, 6, 7, 8)
+ )
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
+
+ // Assert optimizer still knows Q and A are identically partitioned
+ drmQ.partitioningTag should equal(A.partitioningTag)
+
+// drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
+//
+// // Should also be zippable
+// drmQ.rdd.zip(other = A.rdd)
+
+ val inCoreQ = drmQ.collect
+
+ printf("A=\n%s\n", inCoreA)
+ printf("Q=\n%s\n", inCoreQ)
+ printf("R=\n%s\n", inCoreR)
+
+ val (qControl, rControl) = qr(inCoreA)
+ printf("qControl=\n%s\n", qControl)
+ printf("rControl=\n%s\n", rControl)
+
+ // Validate with Cholesky
+ val ch = chol(inCoreA.t %*% inCoreA)
+ printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
+ printf("L:\n%s\n", ch.getL)
+
+ val rControl2 = (ch.getL cloned).t
+ val qControl2 = ch.solveRight(inCoreA)
+ printf("qControl2=\n%s\n", qControl2)
+ printf("rControl2=\n%s\n", rControl2)
+
+ // Housholder approach seems to be a little bit more stable
+ (rControl - inCoreR).norm should be < 1E-5
+ (qControl - inCoreQ).norm should be < 1E-5
+
+ // Assert identicity with in-core Cholesky-based -- this should be tighter.
+ (rControl2 - inCoreR).norm should be < 1E-10
+ (qControl2 - inCoreQ).norm should be < 1E-10
+
+ // Assert orhtogonality:
+ // (a) Q[,j] dot Q[,j] == 1.0 for all j
+ // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
+ for (col <- 0 until inCoreQ.ncol)
+ ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
+ for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
+ (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
+
+
+ }
+
+ test("dssvd - the naive-est - q=0") {
+ dssvdNaive(q = 0)
+ }
+
+ test("ddsvd - naive - q=1") {
+ dssvdNaive(q = 1)
+ }
+
+ test("ddsvd - naive - q=2") {
+ dssvdNaive(q = 2)
+ }
+
+
+ def dssvdNaive(q: Int) {
+ val inCoreA = dense(
+ (1, 2, 3, 4),
+ (2, 3, 4, 5),
+ (3, -4, 5, 6),
+ (4, 5, 6, 7),
+ (8, 6, 7, 8)
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
+ val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
+
+ printf("U:\n%s\n", inCoreU)
+ printf("V:\n%s\n", inCoreV)
+ printf("Sigma:\n%s\n", s)
+
+ (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
+ }
+
+ test("dspca") {
+
+ val rnd = RandomUtils.getRandom
+
+ // Number of points
+ val m = 500
+ // Length of actual spectrum
+ val spectrumLen = 40
+
+ val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+ printf("spectrum:%s\n", spectrum)
+
+ val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+ ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+ // PCA Rotation matrix -- should also be orthonormal.
+ val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+ val input = (u %*%: diagv(spectrum)) %*% tr.t
+ val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+ // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+ // specific additions.
+ val k = 10
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
+ // Un-normalized pca data:
+ drmPCA = drmPCA %*% diagv(s)
+
+ val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+ // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+ // generated input was not centered. So here, we'd just brute-solve pca to verify
+ val xi = input.colMeans()
+ for (r <- 0 until input.nrow) input(r, ::) -= xi
+ var (pcaControl, _, sControl) = svd(m = input)
+ pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+ printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+ printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+ (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+ }
+
+ test("dals") {
+
+ val rnd = RandomUtils.getRandom
+
+ // Number of points
+ val m = 500
+ val n = 500
+
+ // Length of actual spectrum
+ val spectrumLen = 40
+
+ // Create singluar values with decay
+ val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+ printf("spectrum:%s\n", spectrum)
+
+ // Create A as an ideal input
+ val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
+ qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ // Decompose using ALS
+ val (drmU, drmV, rmse) = als(drmInput = drmA, k = 20).toTuple
+ val inCoreU = drmU.collect
+ val inCoreV = drmV.collect
+
+ val predict = inCoreU %*% inCoreV.t
+
+ printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
+ printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
+
+ val err = (inCoreA - predict).norm
+ printf("norm of residuals %f\n", err)
+ printf("train iteration rmses: %s\n", rmse)
+
+ err should be < 1e-2
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
new file mode 100644
index 0000000..849db68
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+
+/** Common tests for DrmLike operators to be executed by all distributed engines. */
+trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers {
+ this: FunSuite =>
+
+ test("mapBlock") {
+
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+ val B = A.mapBlock(/* Inherit width */) {
+ case (keys, block) => keys -> (block += 1.0)
+ }
+
+ val inCoreB = B.collect
+ val inCoreBControl = inCoreA + 1.0
+
+ println(inCoreB)
+
+ // Assert they are the same
+ (inCoreB - inCoreBControl).norm should be < 1E-10
+
+ }
+
+ test("col range") {
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+ val B = A(::, 1 to 2)
+ val inCoreB = B.collect
+ val inCoreBControl = inCoreA(::, 1 to 2)
+
+ println(inCoreB)
+
+ // Assert they are the same
+ (inCoreB - inCoreBControl).norm should be < 1E-10
+
+ }
+
+ test("row range") {
+
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+ val B = A(1 to 2, ::)
+ val inCoreB = B.collect
+ val inCoreBControl = inCoreA(1 to 2, ::)
+
+ println(inCoreB)
+
+ // Assert they are the same
+ (inCoreB - inCoreBControl).norm should be < 1E-10
+
+ }
+
+ test("col, row range") {
+
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+ val B = A(1 to 2, 1 to 2)
+ val inCoreB = B.collect
+ val inCoreBControl = inCoreA(1 to 2, 1 to 2)
+
+ println(inCoreB)
+
+ // Assert they are the same
+ (inCoreB - inCoreBControl).norm should be < 1E-10
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
new file mode 100644
index 0000000..651c611
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+
+/** Common DRM tests to be run by all distributed engines. */
+trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
+ this: FunSuite =>
+
+ test("DRM DFS i/o (local)") {
+
+ val uploadPath = "UploadedDRM"
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5))
+ val drmA = drmParallelize(inCoreA)
+
+ drmA.writeDRM(path = uploadPath)
+
+ println(inCoreA)
+
+ // Load back from hdfs
+ val drmB = drmFromHDFS(path = uploadPath)
+
+ // Collect back into in-core
+ val inCoreB = drmB.collect
+
+ // Print out to see what it is we collected:
+ println(inCoreB)
+
+ }
+
+ test("DRM blockify dense") {
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5))
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ (inCoreA - drmA.mapBlock() {
+ case (keys, block) =>
+ if (!block.isInstanceOf[DenseMatrix])
+ throw new AssertionError("Block must be dense.")
+ keys -> block
+ }).norm should be < 1e-4
+ }
+
+ test("DRM blockify sparse -> SRM") {
+
+ val inCoreA = sparse(
+ (1, 2, 3),
+ 0 -> 3 :: 2 -> 5 :: Nil
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ (inCoreA - drmA.mapBlock() {
+ case (keys, block) =>
+ if (!block.isInstanceOf[SparseRowMatrix])
+ throw new AssertionError("Block must be dense.")
+ keys -> block
+ }).norm should be < 1e-4
+ }
+
+ test("DRM parallelizeEmpty") {
+
+ val drmEmpty = drmParallelizeEmpty(100, 50)
+
+ // collect back into in-core
+ val inCoreEmpty = drmEmpty.collect
+
+ //print out to see what it is we collected:
+ println(inCoreEmpty)
+ printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
+ printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
+
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
new file mode 100644
index 0000000..71dc640
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import decompositions._
+import org.apache.mahout.math.drm.logical.{OpAtB, OpAtA, OpAtx}
+
+/** Common engine tests for distributed R-like DRM operations */
+trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers {
+ this: FunSuite =>
+
+ val epsilon = 1E-5
+
+ test("A.t") {
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5))
+
+ val A = drmParallelize(inCoreA)
+
+ val inCoreAt = A.t.collect
+
+ // Assert first norm of difference is less than error margin.
+ (inCoreAt - inCoreA.t).norm should be < epsilon
+
+ }
+
+ test("C = A %*% B") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+
+ // Actual
+ val inCoreCControl = inCoreA %*% inCoreB
+
+ // Distributed operation
+ val C = A %*% B
+ val inCoreC = C.collect
+ println(inCoreC)
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+
+ // We also should be able to collect via implicit checkpoint
+ val inCoreC2 = C.collect
+ println(inCoreC2)
+
+ (inCoreC2 - inCoreCControl).norm should be < 1E-10
+
+ }
+
+ test("C = A %*% B mapBlock {}") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+ val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+ // Actual
+ val inCoreCControl = inCoreA %*% inCoreB
+
+ A.colSums()
+ B.colSums()
+
+
+ val x = drmBroadcast(dvec(0, 0))
+ val x2 = drmBroadcast(dvec(0, 0))
+ // Distributed operation
+ val C = (B.t %*% A.t).t.mapBlock() {
+ case (keys, block) =>
+ for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
+ keys -> block
+ }
+
+ val inCoreC = C checkpoint CacheHint.NONE collect;
+ println(inCoreC)
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+
+ // We also should be able to collect via implicit checkpoint
+ val inCoreC2 = C.collect
+ println(inCoreC2)
+
+ (inCoreC2 - inCoreCControl).norm should be < 1E-10
+
+ val inCoreQ = dqrThin(C)._1.collect
+
+ printf("Q=\n%s\n", inCoreQ)
+
+ // Assert unit-orthogonality
+ ((inCoreQ(::, 0) dot inCoreQ(::, 0)) - 1.0).abs should be < 1e-10
+ (inCoreQ(::, 0) dot inCoreQ(::, 1)).abs should be < 1e-10
+
+ }
+
+ test("C = A %*% B incompatible B keys") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+ // Re-key B into DrmLike[String] instead of [Int]
+ .mapBlock()({
+ case (keys, block) => keys.map(_.toString) -> block
+ })
+
+ val C = A %*% B
+
+ intercept[IllegalArgumentException] {
+ // This plan must not compile
+ C.checkpoint()
+ }
+ }
+
+ test("Spark-specific C = At %*% B , join") {
+
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+ val inCoreB = dense((3, 5), (4, 6), (0, 1))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+
+ val C = A.t %*% B
+
+ mahoutCtx.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
+
+ val inCoreC = C.collect
+ val inCoreControlC = inCoreA.t %*% inCoreB
+
+ (inCoreC - inCoreControlC).norm should be < 1E-10
+
+ }
+
+
+ test("C = At %*% B , join, String-keyed") {
+
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+ val inCoreB = dense((3, 5), (4, 6), (0, 1))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ .mapBlock()({
+ case (keys, block) => keys.map(_.toString) -> block
+ })
+
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+ .mapBlock()({
+ case (keys, block) => keys.map(_.toString) -> block
+ })
+
+ val C = A.t %*% B
+
+ mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B))
+
+ val inCoreC = C.collect
+ val inCoreControlC = inCoreA.t %*% inCoreB
+
+ (inCoreC - inCoreControlC).norm should be < 1E-10
+
+ }
+
+ test("C = At %*% B , zippable, String-keyed") {
+
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ .mapBlock()({
+ case (keys, block) => keys.map(_.toString) -> block
+ })
+
+ val B = A + 1.0
+
+ val C = A.t %*% B
+
+ mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B))
+
+ val inCoreC = C.collect
+ val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
+
+ (inCoreC - inCoreControlC).norm should be < 1E-10
+
+ }
+
+ test("C = A %*% inCoreB") {
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+ val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val C = A %*% inCoreB
+
+ val inCoreC = C.collect
+ val inCoreCControl = inCoreA %*% inCoreB
+
+ println(inCoreC)
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+
+ }
+
+ test("C = inCoreA %*%: B") {
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+ val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+ val C = inCoreA %*%: B
+
+ val inCoreC = C.collect
+ val inCoreCControl = inCoreA %*% inCoreB
+
+ println(inCoreC)
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+
+ }
+
+ test("C = A.t %*% A") {
+ val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ val AtA = A.t %*% A
+
+ // Assert optimizer detects square
+ mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+ val inCoreAtA = AtA.collect
+ val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+ (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+ }
+
+ test("C = A.t %*% A fat non-graph") {
+ // Hack the max in-mem size for this test
+ System.setProperty("mahout.math.AtA.maxInMemNCol", "540")
+
+ val inCoreA = Matrices.uniformView(400, 550, 1234)
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ val AtA = A.t %*% A
+
+ // Assert optimizer detects square
+ mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+ val inCoreAtA = AtA.collect
+ val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+ (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+ }
+
+ test("C = A.t %*% A non-int key") {
+ val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+ val AintKeyd = drmParallelize(m = inCoreA, numPartitions = 2)
+ val A = AintKeyd.mapBlock() {
+ case (keys, block) => keys.map(_.toString) -> block
+ }
+
+ val AtA = A.t %*% A
+
+ // Assert optimizer detects square
+ mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A))
+
+ val inCoreAtA = AtA.collect
+ val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+ (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+ }
+
+ test("C = A + B") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+
+ val C = A + B
+ val inCoreC = C.collect
+
+ // Actual
+ val inCoreCControl = inCoreA + inCoreB
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+ }
+
+ test("C = A + B, identically partitioned") {
+
+ val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+
+// printf("A.nrow=%d.\n", A.rdd.count())
+
+ // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+ val B = A.mapBlock() {
+ case (keys, block) =>
+ val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+ keys -> bBlock
+ }
+ // Prevent repeated computation non-determinism
+ .checkpoint()
+
+ val inCoreB = B.collect
+
+ printf("A=\n%s\n", inCoreA)
+ printf("B=\n%s\n", inCoreB)
+
+ val C = A + B
+
+ val inCoreC = C.collect
+
+ printf("C=\n%s\n", inCoreC)
+
+ // Actual
+ val inCoreCControl = inCoreA + inCoreB
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+ }
+
+
+ test("C = A + B side test 1") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+
+ val C = A + B
+ val inCoreC = C.collect
+
+ val inCoreD = (A + B).collect
+
+ // Actual
+ val inCoreCControl = inCoreA + inCoreB
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+ (inCoreD - inCoreCControl).norm should be < 1E-10
+ }
+
+ test("C = A + B side test 2") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+
+ val C = A + B
+ val inCoreC = C.collect
+
+ val inCoreD = (A + B).collect
+
+ // Actual
+ val inCoreCControl = inCoreA + inCoreB
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+ (inCoreD - inCoreCControl).norm should be < 1E-10
+ }
+
+ test("C = A + B side test 3") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+
+ val B = drmParallelize(inCoreB, numPartitions = 2)
+ // val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
+ val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
+
+ val C = A + B
+ val inCoreC = C.collect
+
+ val inCoreD = (A + B).collect
+
+ // Actual
+ val inCoreCControl = inCoreA + inCoreB * 2.0
+
+ (inCoreC - inCoreCControl).norm should be < 1E-10
+ (inCoreD - inCoreCControl).norm should be < 1E-10
+ }
+
+ test("Ax") {
+ val inCoreA = dense(
+ (1, 2),
+ (3, 4),
+ (20, 30)
+ )
+ val x = dvec(10, 3)
+
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ val ax = (drmA %*% x).collect(::, 0)
+
+ ax should equal(inCoreA %*% x)
+ }
+
+ test("A'x") {
+ val inCoreA = dense(
+ (1, 2),
+ (3, 4),
+ (20, 30)
+ )
+ val x = dvec(10, 3, 4)
+
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ mahoutCtx.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
+
+ val atx = (drmA.t %*% x).collect(::, 0)
+
+ atx should equal(inCoreA.t %*% x)
+ }
+
+ test("colSums, colMeans") {
+ val inCoreA = dense(
+ (1, 2),
+ (3, 4),
+ (20, 30)
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ drmA.colSums() should equal(inCoreA.colSums())
+ drmA.colMeans() should equal(inCoreA.colMeans())
+ }
+
+ test("numNonZeroElementsPerColumn") {
+ val inCoreA = dense(
+ (0, 2),
+ (3, 0),
+ (0, -30)
+
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
+ }
+
+ test("C = A cbind B, cogroup") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+ val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+ val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+ (A.cbind(B) -: controlC).norm should be < 1e-10
+
+ }
+
+ test("C = A cbind B, zip") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+ (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala b/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
new file mode 100644
index 0000000..3538991
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/test/DistributedMahoutSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.test
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.scalatest.{Suite, FunSuite, Matchers}
+
+/**
+ * Unit tests that use a distributed context to run
+ */
+trait DistributedMahoutSuite extends MahoutSuite { this: Suite =>
+ protected implicit var mahoutCtx: DistributedContext
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 065f2f8..938dc33 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -19,7 +19,7 @@ package org.apache.mahout.cf
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.{MatrixOps, _}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.test.MahoutSuite
import org.scalatest.FunSuite
@@ -37,7 +37,7 @@ B =
1 1 0 1 0
*/
-class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
// correct cooccurrence with LLR
final val matrixLLRCoocAtAControl = dense(
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 2db830c..ca92fcf 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.scalatest.FunSuite
import org.apache.mahout.sparkbindings._
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.test.MahoutSuite
-class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
/*
// correct self-cooccurrence with LLR
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
new file mode 100644
index 0000000..0a0c1af
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.RandomUtils
+import scala.math._
+import org.scalatest.{Matchers, FunSuite}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+class DistributedDecompositionsSuite extends FunSuite with DistributedSparkSuite with DistributedDecompositionsSuiteBase {
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
deleted file mode 100644
index 03c7190..0000000
--- a/spark/src/test/scala/org/apache/mahout/math/decompositions/MathSuite.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.math._
-import drm._
-import scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.common.RandomUtils
-import scala.math._
-import org.scalatest.{Matchers, FunSuite}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-
-class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
-
- test("thin distributed qr") {
-
- val inCoreA = dense(
- (1, 2, 3, 4),
- (2, 3, 4, 5),
- (3, -4, 5, 6),
- (4, 5, 6, 7),
- (8, 6, 7, 8)
- )
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
-
- // Assert optimizer still knows Q and A are identically partitioned
- drmQ.partitioningTag should equal(A.partitioningTag)
-
- drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
-
- // Should also be zippable
- drmQ.rdd.zip(other = A.rdd)
-
- val inCoreQ = drmQ.collect
-
- printf("A=\n%s\n", inCoreA)
- printf("Q=\n%s\n", inCoreQ)
- printf("R=\n%s\n", inCoreR)
-
- val (qControl, rControl) = qr(inCoreA)
- printf("qControl=\n%s\n", qControl)
- printf("rControl=\n%s\n", rControl)
-
- // Validate with Cholesky
- val ch = chol(inCoreA.t %*% inCoreA)
- printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
- printf("L:\n%s\n", ch.getL)
-
- val rControl2 = (ch.getL cloned).t
- val qControl2 = ch.solveRight(inCoreA)
- printf("qControl2=\n%s\n", qControl2)
- printf("rControl2=\n%s\n", rControl2)
-
- // Housholder approach seems to be a little bit more stable
- (rControl - inCoreR).norm should be < 1E-5
- (qControl - inCoreQ).norm should be < 1E-5
-
- // Assert identicity with in-core Cholesky-based -- this should be tighter.
- (rControl2 - inCoreR).norm should be < 1E-10
- (qControl2 - inCoreQ).norm should be < 1E-10
-
- // Assert orhtogonality:
- // (a) Q[,j] dot Q[,j] == 1.0 for all j
- // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
- for (col <- 0 until inCoreQ.ncol)
- ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
- for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
- (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
-
-
- }
-
- test("dssvd - the naive-est - q=0") {
- dssvdNaive(q = 0)
- }
-
- test("ddsvd - naive - q=1") {
- dssvdNaive(q = 1)
- }
-
- test("ddsvd - naive - q=2") {
- dssvdNaive(q = 2)
- }
-
-
- def dssvdNaive(q: Int) {
- val inCoreA = dense(
- (1, 2, 3, 4),
- (2, 3, 4, 5),
- (3, -4, 5, 6),
- (4, 5, 6, 7),
- (8, 6, 7, 8)
- )
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
- val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
-
- printf("U:\n%s\n", inCoreU)
- printf("V:\n%s\n", inCoreV)
- printf("Sigma:\n%s\n", s)
-
- (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
- }
-
- test("dspca") {
-
- val rnd = RandomUtils.getRandom
-
- // Number of points
- val m = 500
- // Length of actual spectrum
- val spectrumLen = 40
-
- val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
- printf("spectrum:%s\n", spectrum)
-
- val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
- ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
- // PCA Rotation matrix -- should also be orthonormal.
- val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
- val input = (u %*%: diagv(spectrum)) %*% tr.t
- val drmInput = drmParallelize(m = input, numPartitions = 2)
-
- // Calculate just first 10 principal factors and reduce dimensionality.
- // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
- // ensure to zero stochastic error and assert only functional correctness of the method's pca-
- // specific additions.
- val k = 10
-
- // Calculate just first 10 principal factors and reduce dimensionality.
- var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
- // Un-normalized pca data:
- drmPCA = drmPCA %*% diagv(s)
-
- val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
- // Of course, once we calculated the pca, the spectrum is going to be different since our originally
- // generated input was not centered. So here, we'd just brute-solve pca to verify
- val xi = input.colMeans()
- for (r <- 0 until input.nrow) input(r, ::) -= xi
- var (pcaControl, _, sControl) = svd(m = input)
- pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
- printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
- printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
- (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
- }
-
- test("dals") {
-
- val rnd = RandomUtils.getRandom
-
- // Number of points
- val m = 500
- val n = 500
-
- // Length of actual spectrum
- val spectrumLen = 40
-
- // Create singluar values with decay
- val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
- printf("spectrum:%s\n", spectrum)
-
- // Create A as an ideal input
- val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
- qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- // Decompose using ALS
- val (drmU, drmV, rmse) = als(drmInput = drmA, k = 20).toTuple
- val inCoreU = drmU.collect
- val inCoreV = drmV.collect
-
- val predict = inCoreU %*% inCoreV.t
-
- printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
- printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
-
- val err = (inCoreA - predict).norm
- printf("norm of residuals %f\n", err)
- printf("train iteration rmses: %s\n", rmse)
-
- err should be < 1e-2
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 52e2b35..69dbcbf 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -17,7 +17,7 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.scalatest.FunSuite
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
@@ -28,7 +28,7 @@ import org.apache.spark.SparkContext._
import org.apache.mahout.math.drm.logical.OpABt
/** Tests for AB' operator algorithms */
-class ABtSuite extends FunSuite with MahoutLocalContext {
+class ABtSuite extends FunSuite with DistributedSparkSuite {
test("ABt") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 389ef65..661e2fe 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.sparkbindings.blas
import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm._
@@ -28,7 +28,7 @@ import org.apache.mahout.math.drm.logical.OpAewB
import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
/** Elementwise matrix operation tests */
-class AewBSuite extends FunSuite with MahoutLocalContext {
+class AewBSuite extends FunSuite with DistributedSparkSuite {
test("A * B Hadamard") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
index 8734b70..49b3f46 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.sparkbindings.blas
import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm._
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext._
import org.apache.mahout.math.drm.logical.OpAtA
/** Tests for {@link XtX} */
-class AtASuite extends FunSuite with MahoutLocalContext {
+class AtASuite extends FunSuite with DistributedSparkSuite {
test("AtA slim") {
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
index a53501d..0123b78 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.sparkbindings.blas
import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm._
@@ -26,7 +26,7 @@ import org.apache.mahout.math.drm.logical.OpAt
import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
/** Tests for A' algorithms */
-class AtSuite extends FunSuite with MahoutLocalContext {
+class AtSuite extends FunSuite with DistributedSparkSuite {
test("At") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
index 81ffccf..42026ae 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
@@ -1,5 +1,3 @@
-package org.apache.mahout.sparkbindings.drm
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,6 +15,7 @@ package org.apache.mahout.sparkbindings.drm
* limitations under the License.
*/
+package org.apache.mahout.sparkbindings.drm
import org.apache.mahout.math._
import scalabindings._
@@ -25,72 +24,11 @@ import RLikeOps._
import RLikeDrmOps._
import org.apache.mahout.sparkbindings._
import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
/** Tests for DrmLikeOps */
-class DrmLikeOpsSuite extends FunSuite with MahoutLocalContext {
-
- test("mapBlock") {
-
- val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
- val B = A.mapBlock(/* Inherit width */) {
- case (keys, block) => keys -> (block += 1.0)
- }
-
- val inCoreB = B.collect
- val inCoreBControl = inCoreA + 1.0
-
- println(inCoreB)
-
- // Assert they are the same
- (inCoreB - inCoreBControl).norm should be < 1E-10
-
- }
-
- test("col range") {
- val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
- val B = A(::, 1 to 2)
- val inCoreB = B.collect
- val inCoreBControl = inCoreA(::, 1 to 2)
-
- println(inCoreB)
-
- // Assert they are the same
- (inCoreB - inCoreBControl).norm should be < 1E-10
+class DrmLikeOpsSuite extends FunSuite with DistributedSparkSuite with DrmLikeOpsSuiteBase {
- }
-
- test("row range") {
-
- val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
- val B = A(1 to 2, ::)
- val inCoreB = B.collect
- val inCoreBControl = inCoreA(1 to 2, ::)
-
- println(inCoreB)
-
- // Assert they are the same
- (inCoreB - inCoreBControl).norm should be < 1E-10
-
- }
-
- test("col, row range") {
-
- val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
- val B = A(1 to 2, 1 to 2)
- val inCoreB = B.collect
- val inCoreBControl = inCoreA(1 to 2, 1 to 2)
-
- println(inCoreB)
-
- // Assert they are the same
- (inCoreB - inCoreBControl).norm should be < 1E-10
-
- }
test("exact, min and auto ||") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index 3c7e7f9..e6a9055 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -23,80 +23,8 @@ import scalabindings._
import drm._
import RLikeOps._
import RLikeDrmOps._
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-/**
- * DRMLike tests
- */
-class DrmLikeSuite extends FunSuite with MahoutLocalContext {
-
-
- test("DRM DFS i/o (local)") {
-
- val uploadPath = "UploadedDRM"
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5))
- val drmA = drmParallelize(inCoreA)
-
- drmA.writeDRM(path = uploadPath)
-
- println(inCoreA)
-
- // Load back from hdfs
- val drmB = drmFromHDFS(path = uploadPath)
-
- // Collect back into in-core
- val inCoreB = drmB.collect
-
- // Print out to see what it is we collected:
- println(inCoreB)
-
- }
-
- test("DRM blockify dense") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5))
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- (inCoreA - drmA.mapBlock() {
- case (keys, block) =>
- if (!block.isInstanceOf[DenseMatrix])
- throw new AssertionError("Block must be dense.")
- keys -> block
- }).norm should be < 1e-4
- }
-
- test("DRM blockify sparse -> SRM") {
-
- val inCoreA = sparse(
- (1, 2, 3),
- 0 -> 3 :: 2 -> 5 :: Nil
- )
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- (inCoreA - drmA.mapBlock() {
- case (keys, block) =>
- if (!block.isInstanceOf[SparseRowMatrix])
- throw new AssertionError("Block must be dense.")
- keys -> block
- }).norm should be < 1e-4
- }
-
- test("DRM parallelizeEmpty") {
-
- val drmEmpty = drmParallelizeEmpty(100, 50)
-
- // collect back into in-core
- val inCoreEmpty = drmEmpty.collect
-
- //print out to see what it is we collected:
- println(inCoreEmpty)
- printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
- printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
-
-
- }
-
-
-}
+/** DRMLike tests -- just run common DRM tests in Spark. */
+class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuiteBase
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 50f8978..b15c72c 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -17,486 +17,11 @@
package org.apache.mahout.sparkbindings.drm
-import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.FunSuite
import org.apache.mahout.math._
-import decompositions._
-import scalabindings._
import drm._
-import RLikeOps._
-import RLikeDrmOps._
import org.apache.mahout.sparkbindings._
-import test.MahoutLocalContext
-import scala.collection.mutable.ArrayBuffer
-import org.apache.mahout.math.Matrices
-import org.apache.mahout.sparkbindings.{SparkEngine, blas}
-import org.apache.spark.storage.StorageLevel
-import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
-import scala.util.Random
+import test.DistributedSparkSuite
-/** R-like DRM DSL operation tests */
-class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
-
- import RLikeOps._
-
- val epsilon = 1E-5
-
- test("A.t") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5))
-
- val A = drmParallelize(inCoreA)
-
- val inCoreAt = A.t.collect
-
- // Assert first norm of difference is less than error margin.
- (inCoreAt - inCoreA.t).norm should be < epsilon
-
- }
-
- test("C = A %*% B") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val B = drmParallelize(inCoreB, numPartitions = 2)
-
- // Actual
- val inCoreCControl = inCoreA %*% inCoreB
-
- // Distributed operation
- val C = A %*% B
- val inCoreC = C.collect
- println(inCoreC)
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
-
- // We also should be able to collect via implicit checkpoint
- val inCoreC2 = C.collect
- println(inCoreC2)
-
- (inCoreC2 - inCoreCControl).norm should be < 1E-10
-
- }
-
- test("C = A %*% B mapBlock {}") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
- val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
-
- // Actual
- val inCoreCControl = inCoreA %*% inCoreB
-
- A.colSums()
- B.colSums()
-
-
- val x = drmBroadcast(dvec(0, 0))
- val x2 = drmBroadcast(dvec(0, 0))
- // Distributed operation
- val C = (B.t %*% A.t).t.mapBlock() {
- case (keys, block) =>
- for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
- keys -> block
- }
-
- val inCoreC = C checkpoint CacheHint.NONE collect;
- println(inCoreC)
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
-
- // We also should be able to collect via implicit checkpoint
- val inCoreC2 = C.collect
- println(inCoreC2)
-
- (inCoreC2 - inCoreCControl).norm should be < 1E-10
-
- val inCoreQ = dqrThin(C)._1.collect
-
- printf("Q=\n%s\n", inCoreQ)
-
- // Assert unit-orthogonality
- ((inCoreQ(::, 0) dot inCoreQ(::, 0)) - 1.0).abs should be < 1e-10
- (inCoreQ(::, 0) dot inCoreQ(::, 1)).abs should be < 1e-10
-
- }
-
- test("C = A %*% B incompatible B keys") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val B = drmParallelize(inCoreB, numPartitions = 2)
- // Re-key B into DrmLike[String] instead of [Int]
- .mapBlock()({
- case (keys, block) => keys.map(_.toString) -> block
- })
-
- val C = A %*% B
-
- intercept[IllegalArgumentException] {
- // This plan must not compile
- C.checkpoint()
- }
- }
-
- test("C = At %*% B , join") {
-
- val inCoreA = dense((1, 2), (3, 4), (-3, -5))
- val inCoreB = dense((3, 5), (4, 6), (0, 1))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val B = drmParallelize(inCoreB, numPartitions = 2)
-
- val C = A.t %*% B
-
- SparkEngine.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
-
- val inCoreC = C.collect
- val inCoreControlC = inCoreA.t %*% inCoreB
-
- (inCoreC - inCoreControlC).norm should be < 1E-10
-
- }
-
- test("C = At %*% B , join, String-keyed") {
-
- val inCoreA = dense((1, 2), (3, 4), (-3, -5))
- val inCoreB = dense((3, 5), (4, 6), (0, 1))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- .mapBlock()({
- case (keys, block) => keys.map(_.toString) -> block
- })
-
- val B = drmParallelize(inCoreB, numPartitions = 2)
- .mapBlock()({
- case (keys, block) => keys.map(_.toString) -> block
- })
-
- val C = A.t %*% B
-
- SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
-
- val inCoreC = C.collect
- val inCoreControlC = inCoreA.t %*% inCoreB
-
- (inCoreC - inCoreControlC).norm should be < 1E-10
-
- }
-
- test("C = At %*% B , zippable, String-keyed") {
-
- val inCoreA = dense((1, 2), (3, 4), (-3, -5))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- .mapBlock()({
- case (keys, block) => keys.map(_.toString) -> block
- })
-
- val B = A + 1.0
-
- val C = A.t %*% B
-
- SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
-
- val inCoreC = C.collect
- val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
-
- (inCoreC - inCoreControlC).norm should be < 1E-10
-
- }
-
- test("C = A %*% inCoreB") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
- val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val C = A %*% inCoreB
-
- val inCoreC = C.collect
- val inCoreCControl = inCoreA %*% inCoreB
-
- println(inCoreC)
- (inCoreC - inCoreCControl).norm should be < 1E-10
-
- }
-
- test("C = inCoreA %*%: B") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
- val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
-
- val B = drmParallelize(inCoreB, numPartitions = 2)
- val C = inCoreA %*%: B
-
- val inCoreC = C.collect
- val inCoreCControl = inCoreA %*% inCoreB
-
- println(inCoreC)
- (inCoreC - inCoreCControl).norm should be < 1E-10
-
- }
-
- test("C = A.t %*% A") {
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
- val AtA = A.t %*% A
-
- // Assert optimizer detects square
- SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
- val inCoreAtA = AtA.collect
- val inCoreAtAControl = inCoreA.t %*% inCoreA
-
- (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
- }
-
- test("C = A.t %*% A fat non-graph") {
- // Hack the max in-mem size for this test
- System.setProperty(blas.AtA.PROPERTY_ATA_MAXINMEMNCOL, "540")
-
- val inCoreA = Matrices.uniformView(400, 550, 1234)
- val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
- val AtA = A.t %*% A
-
- // Assert optimizer detects square
- SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
- val inCoreAtA = AtA.collect
- val inCoreAtAControl = inCoreA.t %*% inCoreA
-
- (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
- log.debug("test done.")
- }
-
-
- test("C = A.t %*% A non-int key") {
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
- val AintKeyd = drmParallelize(m = inCoreA, numPartitions = 2)
- val A = AintKeyd.mapBlock() {
- case (keys, block) => keys.map(_.toString) -> block
- }
-
- val AtA = A.t %*% A
-
- // Assert optimizer detects square
- SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
-
- val inCoreAtA = AtA.collect
- val inCoreAtAControl = inCoreA.t %*% inCoreA
-
- (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
- }
-
- test("C = A + B") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val B = drmParallelize(inCoreB, numPartitions = 2)
-
- val C = A + B
- val inCoreC = C.collect
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- }
-
- test("C = A + B, identically partitioned") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
-
- printf("A.nrow=%d.\n", A.rdd.count())
-
- // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
- val B = A.mapBlock() {
- case (keys, block) =>
- val bBlock = block.like() := ((r, c, v) => util.Random.nextDouble())
- keys -> bBlock
- }
- // Prevent repeated computation non-determinism
- .checkpoint()
-
- val inCoreB = B.collect
-
- printf("A=\n%s\n", inCoreA)
- printf("B=\n%s\n", inCoreB)
-
- val C = A + B
-
- val inCoreC = C.collect
-
- printf("C=\n%s\n", inCoreC)
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- }
-
-
- test("C = A + B side test 1") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
- val B = drmParallelize(inCoreB, numPartitions = 2)
-
- val C = A + B
- val inCoreC = C.collect
-
- val inCoreD = (A + B).collect
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- (inCoreD - inCoreCControl).norm should be < 1E-10
- }
-
- test("C = A + B side test 2") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
- val B = drmParallelize(inCoreB, numPartitions = 2)
-
- val C = A + B
- val inCoreC = C.collect
-
- val inCoreD = (A + B).collect
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- (inCoreD - inCoreCControl).norm should be < 1E-10
- }
-
- test("C = A + B side test 3") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
-
- val B = drmParallelize(inCoreB, numPartitions = 2)
- // val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
- val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
-
- val C = A + B
- val inCoreC = C.collect
-
- val inCoreD = (A + B).collect
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB * 2.0
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- (inCoreD - inCoreCControl).norm should be < 1E-10
- }
-
- test("general side") {
- val sc = implicitly[DistributedContext]
- val k1 = sc.parallelize(Seq(ArrayBuffer(0, 1, 2, 3)))
- // .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect!
- .persist(StorageLevel.MEMORY_ONLY_SER)
-
- println(k1.map(_ += 4).collect.head)
- println(k1.map(_ += 4).collect.head)
- }
-
- test("Ax") {
- val inCoreA = dense(
- (1, 2),
- (3, 4),
- (20, 30)
- )
- val x = dvec(10, 3)
-
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- val ax = (drmA %*% x).collect(::, 0)
-
- ax should equal(inCoreA %*% x)
- }
-
- test("A'x") {
- val inCoreA = dense(
- (1, 2),
- (3, 4),
- (20, 30)
- )
- val x = dvec(10, 3, 4)
-
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- SparkEngine.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
-
- val atx = (drmA.t %*% x).collect(::, 0)
-
- atx should equal(inCoreA.t %*% x)
- }
-
- test("colSums, colMeans") {
- val inCoreA = dense(
- (1, 2),
- (3, 4),
- (20, 30)
- )
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- drmA.colSums() should equal(inCoreA.colSums())
- drmA.colMeans() should equal(inCoreA.colMeans())
- }
-
- test("numNonZeroElementsPerColumn") {
- val inCoreA = dense(
- (0, 2),
- (3, 0),
- (0, -30)
-
- )
- val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
- drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
- }
-
- test("C = A cbind B, cogroup") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val inCoreB = dense((3, 5), (4, 6))
- val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
-
- val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
- val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
-
- (A.cbind(B) -: controlC).norm should be < 1e-10
-
- }
-
- test("C = A cbind B, zip") {
-
- val inCoreA = dense((1, 2), (3, 4))
- val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
-
- val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
-
- (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
-
- }
-
-}
+/** ==R-like DRM DSL operation tests -- Spark== */
+class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
new file mode 100644
index 0000000..a0136e0
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.test
+
+import org.scalatest.Suite
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite}
+import org.apache.mahout.math.drm.DistributedContext
+
+trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfiguration {
+ this: Suite =>
+
+ protected implicit var mahoutCtx: DistributedContext = _
+ protected var masterUrl = null.asInstanceOf[String]
+
+ override protected def beforeEach() {
+ super.beforeEach()
+
+ masterUrl = "local[2]"
+ mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
+ appName = "MahoutLocalContext",
+ // Do not run MAHOUT_HOME jars in unit tests.
+ addMahoutJars = false,
+ sparkConf = new SparkConf()
+ .set("spark.kryoserializer.buffer.mb", "15")
+ .set("spark.akka.frameSize", "30")
+ .set("spark.default.parallelism", "10")
+ )
+ }
+
+ override protected def afterEach() {
+ if (mahoutCtx != null) {
+ try {
+ mahoutCtx.close()
+ } finally {
+ mahoutCtx = null
+ }
+ }
+ super.afterEach()
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
index a80f42b..d5d16a8 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.sparkbindings.test
import org.scalatest.Suite
http://git-wip-us.apache.org/repos/asf/mahout/blob/25a6fc09/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
deleted file mode 100644
index fb97f68..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.mahout.sparkbindings.test
-
-import org.scalatest.Suite
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.test.MahoutSuite
-import org.apache.mahout.math.drm.DistributedContext
-
-trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
- this: Suite =>
-
- protected implicit var mahoutCtx: DistributedContext = _
- protected var masterUrl = null.asInstanceOf[String]
-
- override protected def beforeEach() {
- super.beforeEach()
-
- masterUrl = "local[2]"
- mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
- appName = "MahoutLocalContext",
- // Do not run MAHOUT_HOME jars in unit tests.
- addMahoutJars = false,
- sparkConf = new SparkConf()
- .set("spark.kryoserializer.buffer.mb", "15")
- .set("spark.akka.frameSize", "30")
- .set("spark.default.parallelism", "10")
- )
- }
-
- override protected def afterEach() {
- if (mahoutCtx != null) {
- try {
- mahoutCtx.close()
- } finally {
- mahoutCtx = null
- }
- }
- super.afterEach()
- }
-}