You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/06/04 23:28:35 UTC
git commit: Revert "MAHOUT-1566: (Experimental) Regular ALS
factorizer with conversion tests, optimizer enhancements and bug fixes."
Repository: mahout
Updated Branches:
refs/heads/master 656878ea6 -> 26d3db6ba
Revert "MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes."
This reverts commit 656878ea69adf7663015156b55ad5c9ea3ec7e4f. (somehow went partially only).
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/26d3db6b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/26d3db6b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/26d3db6b
Branch: refs/heads/master
Commit: 26d3db6ba1f12870af996ef9eaea17dfc4a8b8fb
Parents: 656878e
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Wed Jun 4 14:27:37 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Wed Jun 4 14:27:37 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 -
.../mahout/math/drm/CheckpointedOps.scala | 3 +-
.../mahout/math/drm/DistributedEngine.scala | 11 +-
.../org/apache/mahout/math/drm/DrmLike.scala | 2 +-
.../org/apache/mahout/math/drm/DrmLikeOps.scala | 9 +-
.../apache/mahout/math/drm/RLikeDrmOps.scala | 9 +-
.../mahout/math/drm/decompositions/DQR.scala | 57 ++++++
.../mahout/math/drm/decompositions/DSPCA.scala | 153 ++++++++++++++++
.../mahout/math/drm/decompositions/DSSVD.scala | 83 +++++++++
.../apache/mahout/math/drm/logical/OpAewB.scala | 9 +-
.../mahout/math/drm/logical/OpMapBlock.scala | 8 +-
.../org/apache/mahout/math/drm/package.scala | 79 +++++----
.../mahout/math/scalabindings/MatrixOps.scala | 2 -
.../math/scalabindings/RLikeMatrixOps.scala | 12 +-
.../scalabindings/decompositions/SSVD.scala | 165 +++++++++++++++++
.../mahout/math/scalabindings/package.scala | 21 +++
.../mahout/math/scalabindings/MathSuite.scala | 76 ++++++++
.../mahout/sparkbindings/SparkEngine.scala | 35 ++--
.../apache/mahout/sparkbindings/blas/ABt.scala | 35 ++--
.../apache/mahout/sparkbindings/blas/AewB.scala | 159 +++++------------
.../drm/CheckpointedDrmSpark.scala | 8 +-
.../mahout/sparkbindings/drm/package.scala | 28 +--
.../apache/mahout/sparkbindings/package.scala | 4 +-
.../mahout/sparkbindings/blas/ABtSuite.scala | 6 +-
.../mahout/sparkbindings/blas/AewBSuite.scala | 16 +-
.../decompositions/MathSuite.scala | 176 +++++++++++++++++++
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 37 ----
.../sparkbindings/test/MahoutLocalContext.scala | 2 +-
28 files changed, 887 insertions(+), 320 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3b2e61b..bea124a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,8 +2,6 @@ Mahout Change Log
Release 1.0 - unreleased
- MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes (dlyubimov)
-
MAHOUT-1537: Minor fixes to spark-shell (Anand Avati via dlyubimov)
MAHOUT-1529: Finalize abstraction of distributed logical plans from backend operations (dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index edd0cfc..fa1ccfd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.math.drm
import scala.reflect.ClassTag
-import org.apache.mahout.math._
+import org.apache.mahout.math.Vector
/**
@@ -35,6 +35,5 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
/** Column Means */
def colMeans(): Vector = drm.context.colMeans(drm)
- def norm():Double = drm.context.norm(drm)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 5ffee9d..0e76d87 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,11 +19,10 @@ package org.apache.mahout.math.drm
import scala.reflect.ClassTag
import logical._
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
+import org.apache.mahout.math.{Matrix, Vector}
import DistributedEngine._
import org.apache.mahout.math.scalabindings._
+import RLikeOps._
/** Abstraction of optimizer/distributed engine */
trait DistributedEngine {
@@ -44,12 +43,10 @@ trait DistributedEngine {
def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
/** Engine-specific colSums implementation based on a checkpoint. */
- def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+ def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
/** Engine-specific colMeans implementation based on a checkpoint. */
- def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
-
- def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
+ def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
/** Broadcast support */
def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 7fbfc12..8e0db1e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -28,7 +28,7 @@ package org.apache.mahout.math.drm
*/
trait DrmLike[K] {
- protected[mahout] def partitioningTag: Long
+ protected[mahout] def partitioningTag:Long
protected[mahout] val context:DistributedContext
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 328805a..35e28af 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -36,14 +36,9 @@ class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
* @tparam R
* @return
*/
- def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
+ def mapBlock[R : ClassTag](ncol: Int = -1)
(bmf: BlockMapFunc[K, R]): DrmLike[R] =
- new OpMapBlock[K, R](
- A = drm,
- bmf = bmf,
- _ncol = ncol,
- identicallyPartitioned = identicallyParitioned
- )
+ new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 7ac5577..f46d15c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,13 +25,13 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
import RLikeDrmOps._
- def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
+ def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
- def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
+ def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
- def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
+ def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
- def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
+ def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
@@ -70,6 +70,7 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
}
object RLikeDrmOps {
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+ private val log = Logger.getLogger(DQR.getClass)
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+ if (A.ncol > 5000)
+ log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+ implicit val ctx = A.context
+
+ val AtA = (A.t %*% A).checkpoint()
+ val inCoreAtA = AtA.collect
+
+ if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+ val ch = chol(inCoreAtA)
+ val inCoreR = (ch.getL cloned) t
+
+ if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+ if (checkRankDeficiency && !ch.isPositiveDefinite)
+ throw new IllegalArgumentException("R is rank-deficient.")
+
+ val bcastAtA = drmBroadcast(inCoreAtA)
+
+ // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+ // decompose A'A in the backend again.
+
+ // Compute Q = A*inv(L') -- we can do it blockwise.
+ val Q = A.mapBlock() {
+ case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+ }
+
+ Q -> inCoreR
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+ implicit val ctx = A.context
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // Dataset mean
+ val xi = drmA.colMeans
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+ val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+ // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+ // memory beyond that is required to keep xi around, it still might be parallelized to backs
+ // for significantly big n and r. TODO
+ val s_o = omega.t %*% xi
+
+ val bcastS_o = drmBroadcast(s_o)
+ val bcastXi = drmBroadcast(xi)
+
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val s_o:Vector = bcastS_o
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+ keys -> blockY
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ var s_q = drmQ.colSums()
+ var bcastVarS_q = drmBroadcast(s_q)
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ var s_b = (drmBt.t %*% xi).collect(::, 0)
+ var bcastVarS_b = drmBroadcast(s_b)
+
+ for (i <- 0 until q) {
+
+ // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+ // attributes correctly. So we create additional set of vals for broadcast vars to properly
+ // create readonly closure attributes in this very scope.
+ val bcastS_q = bcastVarS_q
+ val bcastS_b = bcastVarS_b
+ val bcastXib = bcastXi
+
+ // Fix Bt as B' -= xi cross s_q
+ drmBt = drmBt.mapBlock() {
+ case (keys, block) =>
+ val s_q: Vector = bcastS_q
+ val xi: Vector = bcastXib
+ keys.zipWithIndex.foreach {
+ case (key, idx) => block(idx, ::) -= s_q * xi(key)
+ }
+ keys -> block
+ }
+
+ drmY.uncache()
+ drmQ.uncache()
+
+ drmY = (drmA %*% drmBt)
+ // Fix Y by subtracting s_b from each row of the AB'
+ .mapBlock() {
+ case (keys, block) =>
+ val s_b: Vector = bcastS_b
+ for (row <- 0 until block.nrow) block(row, ::) -= s_b
+ keys -> block
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ s_q = drmQ.colSums()
+ bcastVarS_q = drmBroadcast(s_q)
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ s_b = (drmBt.t %*% xi).collect(::, 0)
+ bcastVarS_b = drmBroadcast(s_b)
+ }
+
+ val c = s_q cross s_b
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+ c - c.t + (s_q cross s_q) * (xi dot xi)
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+
+ // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+ // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+ // is much more compact.
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ keys -> blockY
+ }
+
+ var drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (q == 0) drmQ = drmQ.checkpoint()
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (q == 0) drmBt = drmBt.checkpoint()
+
+ for (i <- 0 until q) {
+ drmY = drmA %*% drmBt
+ drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (i == q - 1) drmQ = drmQ.checkpoint()
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (i == q - 1) drmBt = drmBt.checkpoint()
+ }
+
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index da7c0d5..d07172a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -19,24 +19,17 @@ package org.apache.mahout.math.drm.logical
import scala.reflect.ClassTag
import org.apache.mahout.math.drm.DrmLike
-import scala.util.Random
/** DRM elementwise operator */
case class OpAewB[K: ClassTag](
override var A: DrmLike[K],
override var B: DrmLike[K],
- val op: String
+ val op: Char
) extends AbstractBinaryOp[K, K, K] {
-
-
assert(A.ncol == B.ncol, "arguments must have same number of columns")
assert(A.nrow == B.nrow, "arguments must have same number of rows")
- override protected[mahout] lazy val partitioningTag: Long =
- if (A.partitioningTag == B.partitioningTag) A.partitioningTag
- else Random.nextLong()
-
/** R-like syntax for number of rows. */
def nrow: Long = A.nrow
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 7299d9e..8e4362d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -21,18 +21,16 @@ import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
-import scala.util.Random
class OpMapBlock[S: ClassTag, R: ClassTag](
override var A: DrmLike[S],
val bmf: BlockMapFunc[S, R],
val _ncol: Int = -1,
- val _nrow: Long = -1,
- identicallyPartitioned:Boolean
+ val _nrow: Long = -1
) extends AbstractUnaryOp[S, R] {
- override protected[mahout] lazy val partitioningTag: Long =
- if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
/** R-like syntax for number of rows. */
def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 02e8b7a..768bb1c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -18,9 +18,7 @@
package org.apache.mahout.math
import scala.reflect.ClassTag
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
package object drm {
@@ -72,45 +70,56 @@ package object drm {
(implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
/** Implicit broadcast -> value conversion. */
- implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
+ implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
/** Just throw all engine operations into context as well. */
- implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
+ implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
new CheckpointedOps[K](drm)
+ implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+ // ============== Decompositions ===================
+
/**
- * We assume that whenever computational action is invoked without explicit checkpoint, the user
- * doesn't imply caching
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
*/
- implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
-
- /** Implicit conversion to in-core with NONE caching of the result. */
- implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
-
- /** Do vertical concatenation of collection of blockified tuples */
- def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
- assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
- if (blocks.size == 1) {
- // No coalescing required.
- blocks.head
- } else {
- // compute total number of rows in a new block
- val m = blocks.view.map(_._2.nrow).sum
- val n = blocks.head._2.ncol
- val coalescedBlock = blocks.head._2.like(m, n)
- val coalescedKeys = new Array[K](m)
- var row = 0
- for (elem <- blocks.view) {
- val block = elem._2
- val rowEnd = row + block.nrow
- coalescedBlock(row until rowEnd, ::) := block
- elem._1.copyToArray(coalescedKeys, row)
- row = rowEnd
- }
- coalescedKeys -> coalescedBlock
- }
- }
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+ DQR.dqrThin(A, checkRankDeficiency)
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index 48c2048..a872240 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -51,8 +51,6 @@ class MatrixOps(val m: Matrix) {
def -(that: Matrix) = cloned -= that
- def -:(that: Matrix) = that - m
-
// m.plus(that)?
def +(that: Double) = cloned += that
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
index af53c51..23c2ac2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
@@ -20,7 +20,7 @@ import org.apache.mahout.math.{Vector, Matrix}
import scala.collection.JavaConversions._
import RLikeOps._
-class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
+class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
/**
* matrix-matrix multiplication
@@ -47,14 +47,12 @@ class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
def *(that: Double) = cloned *= that
- def /(that: Matrix) = cloned /= that
+ def /(that:Matrix) = cloned /= that
- def /:(that: Matrix) = that / m
-
- def /(that: Double) = cloned /= that
+ def /(that:Double) = cloned /= that
/** 1.0 /: A is eqivalent to R's 1.0/A */
- def /:(that: Double) = that /=: cloned
+ def /:(that:Double) = that /=: cloned
/**
* in-place Hadamard product. We probably don't want to use assign
@@ -84,7 +82,7 @@ class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
}
/** 1.0 /=: A is equivalent to A = 1.0/A in R */
- def /=:(that: Double) = {
+ def /=:(that:Double) = {
m.foreach(that /=: _.vector())
m
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+ private val log = Logger.getLogger(SSVD.getClass)
+
+ /**
+ * In-core SSVD algorithm.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ var y = a %*% omega
+ var yty = y.t %*% y
+ val at = a.t
+ var ch = chol(yty)
+ assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+ var bt = ch.solveRight(at %*% y)
+
+ // Power iterations
+ for (i <- 0 until q) {
+ y = a %*% bt
+ yty = y.t %*% y
+ ch = chol(yty)
+ bt = ch.solveRight(at %*% y)
+ }
+
+ val bbt = bt.t %*% bt
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = ch.solveRight(y) %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+ }
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ // Dataset mean
+ val xi = a.colMeans()
+
+ if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+ var y = a %*% omega
+
+ // Fixing y
+ val s_o = omega.t %*% xi
+ y := ((r,c,v) => v - s_o(c))
+
+ var yty = y.t %*% y
+ var ch = chol(yty)
+// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+ // This is implicit Q of QR(Y)
+ var qm = ch.solveRight(y)
+ var bt = a.t %*% qm
+ var s_q = qm.colSums()
+ var s_b = bt.t %*% xi
+
+ // Power iterations
+ for (i <- 0 until q) {
+
+ // Fix bt
+ bt -= xi cross s_q
+
+ y = a %*% bt
+
+ // Fix Y again.
+ y := ((r,c,v) => v - s_b(c))
+
+ yty = y.t %*% y
+ ch = chol(yty)
+ qm = ch.solveRight(y)
+ bt = a.t %*% qm
+ s_q = qm.colSums()
+ s_b = bt.t %*% xi
+ }
+
+ val c = s_q cross s_b
+
+ // BB' computation becomes
+ val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
+
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = qm %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 2b7773b..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -281,5 +281,26 @@ package object scalabindings {
x(::, 0)
}
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
+ SSVD.spca(a = a, k = k, p = p, q = q)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index b10cde3..d1171cc 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -193,6 +193,82 @@ class MathSuite extends FunSuite with MahoutSuite {
}
+ test("ssvd") {
+ val a = dense(
+ (1, 2, 3),
+ (3, 4, 5),
+ (-2, 6, 7),
+ (-3, 8, 9)
+ )
+
+ val rank = 2
+ val (u, v, s) = ssvd(a, k = rank, q = 1)
+
+ val (uControl, vControl, sControl) = svd(a)
+
+ printf("U:\n%s\n", u)
+ printf("U-control:\n%s\n", uControl)
+ printf("V:\n%s\n", v)
+ printf("V-control:\n%s\n", vControl)
+ printf("Sigma:\n%s\n", s)
+ printf("Sigma-control:\n%s\n", sControl)
+
+ (s - sControl(0 until rank)).norm(2) should be < 1E-7
+
+ // Singular vectors may be equivalent down to a sign only.
+ (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
+ (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
+
+ }
+
+ test("spca") {
+
+ import math._
+
+ val rnd = RandomUtils.getRandom
+
+ // Number of points
+ val m = 500
+ // Length of actual spectrum
+ val spectrumLen = 40
+
+ val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+ printf("spectrum:%s\n", spectrum)
+
+ val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+ ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+ // PCA Rotation matrix -- should also be orthonormal.
+ val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+ val input = (u %*%: diagv(spectrum)) %*% tr.t
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+ // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+ // specific additions.
+ val k = 10
+ var (pca, _, s) = spca(a = input, k = k, p=spectrumLen, q = 1)
+ printf("Svs:%s\n",s)
+ // Un-normalized pca data:
+ pca = pca %*%: diagv(s)
+
+ // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+ // generated input was not centered. So here, we'd just brute-solve pca to verify
+ val xi = input.colMeans()
+ for (r <- 0 until input.nrow) input(r, ::) -= xi
+ var (pcaControl, _, sControl) = svd(m = input)
+
+ printf("Svs-control:%s\n",sControl)
+ pcaControl = (pcaControl %*%: diagv(sControl))(::,0 until k)
+
+ printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+ printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+ (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+ }
+
test("random uniform") {
val omega1 = Matrices.symmetricUniformView(2, 3, 1234)
val omega2 = Matrices.symmetricUniformView(2, 3, 1234)
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3a03e58..0c904ab 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,9 +17,9 @@
package org.apache.mahout.sparkbindings
-import org.apache.mahout.math._
-import scalabindings._
+import org.apache.mahout.math.scalabindings._
import RLikeOps._
+import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.logical._
import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
import org.apache.mahout.math._
@@ -30,8 +30,6 @@ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
import scala.Some
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
/** Spark-specific non-drm-method operations */
object SparkEngine extends DistributedEngine {
@@ -53,17 +51,7 @@ object SparkEngine extends DistributedEngine {
}
/** Engine-specific colMeans implementation based on a checkpoint. */
- override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
- if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
- override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
- drm.rdd
- // Compute sum of squares of each vector
- .map {
- case (key, v) => v dot v
- }
- .reduce(_ + _)
-
+ def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
/**
* Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -135,10 +123,7 @@ object SparkEngine extends DistributedEngine {
{
implicit def getWritable(x: Any): Writable = val2key()
- new CheckpointedDrmSpark(
- rdd = rdd.map(t => (key2val(t._1), t._2)),
- _cacheStorageLevel = StorageLevel.MEMORY_ONLY
- )(km.asInstanceOf[ClassTag[Any]])
+ new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
}
}
@@ -225,8 +210,16 @@ object SparkEngine extends DistributedEngine {
case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
- case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
// Custom operators, we just execute them
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 1e3f286..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -65,22 +65,16 @@ object ABt {
// Blockify everything.
val blocksA = srcA.toBlockifiedDrmRdd()
- // Mark row-blocks with group id
+ // mark row-blocks with group id
.mapPartitionsWithIndex((part, iter) => {
+ val rowBlockId = part
+ val (blockKeys, block) = iter.next()
- if (iter.isEmpty) {
- Iterator.empty
- } else {
-
- val rowBlockId = part
- val (blockKeys, block) = iter.next()
+ // Each partition must have exactly one block due to implementation guarantees of blockify()
+ iter.ensuring(!_.hasNext)
- // Each partition must have exactly one block due to implementation guarantees of blockify()
- assert(!iter.hasNext, "Partition #%d is expected to have at most 1 block at AB'.".format(part))
-
- // the output is (row block id, array of row keys, and the matrix representing the block).
- Iterator((rowBlockId, blockKeys, block))
- }
+ // the output is (row block id, array of row keys, and the matrix representing the block).
+ Iterator((rowBlockId, blockKeys, block))
})
val blocksB = srcB.toBlockifiedDrmRdd()
@@ -102,7 +96,7 @@ object ABt {
// The plan.
- var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+ val blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
// Build Cartesian. It may require a bit more memory there at tasks.
.cartesian(blocksB)
@@ -125,7 +119,7 @@ object ABt {
// Combine -- this is probably the most efficient
.combineByKey[(Array[K],Matrix)](
- createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
+ createCombiner = (t:(Array[K],Array[Int],Matrix)) => t match {
case (rowKeys, colKeys, blockProd) =>
// Accumulator is a row-wise block of sparse vectors.
@@ -154,20 +148,11 @@ object ABt {
mergeCombiners = (c1: (Array[K], Matrix), c2: (Array[K], Matrix)) => {
c1._2 += c2._2
c1
- },
-
- // Cartesian will tend to produce much more partitions that we actually need for co-grouping,
- // and as a result, we may see empty partitions than we actually need.
- numPartitions = numProductPartitions
- )
+ })
// Combine leaves residual block key -- we don't need that.
.map(_._2)
- // This may produce more than one block per partition. Most implementation rely on convention of
- // having at most one block per partition.
- blockifiedRdd = rbind(blockifiedRdd)
-
new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 384b986..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -24,91 +24,67 @@ import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.{Matrix, Vector}
import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
-import org.apache.log4j.Logger
-import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
/** Elementwise drm-drm operators */
object AewB {
- private val log = Logger.getLogger(AewB.getClass)
+ @inline
+ def a_plus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+ a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a += b)
- /**
- * Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent
- * computations become a problem.
- */
- final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace"
+ @inline
+ def a_minus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+ a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a -= b)
- type ReduceFunc = (Vector, Vector) => Vector
+ @inline
+ def a_hadamard_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+ a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a *= b)
- type ReduceFuncScalar = (Matrix, Double) => Matrix
+ @inline
+ def a_eldiv_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+ a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a /= b)
- private[blas] def getEWOps() = {
- val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean
- if (inplaceProp) InplaceEWOps else CloningEWOps
- }
-
- /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
- def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+ @inline
+ def a_plus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A += s)
- val ewOps = getEWOps()
- val opId = op.op
+ @inline
+ def a_minus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A -= s)
- val reduceFunc = opId match {
- case "+" => ewOps.plus
- case "-" => ewOps.minus
- case "*" => ewOps.hadamard
- case "/" => ewOps.eldiv
- case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
- }
+ @inline
+ def scalar_minus_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s -=: A)
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ @inline
+ def a_times_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A *= s)
- // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
- // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
- // reduce-side.
- val rdd = if (op.isIdenticallyPartitioned(op.A)) {
+ @inline
+ def a_div_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A /= s)
- log.debug("applying zipped elementwise")
+ @inline
+ def scalar_div_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+ a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s /=: A)
- a
- .zip(b)
- .map {
- case ((keyA, vectorA), (keyB, vectorB)) =>
- assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed")
- keyA -> reduceFunc(vectorA, vectorB)
- }
- } else {
-
- log.debug("applying elementwise as join")
-
- a
- .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
- .map({
- case (key, (vectorSeqA, vectorSeqB)) =>
- key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
- })
- }
+ /** Parallel way of this operation (this assumes different partitioning of the sources */
+ private[blas] def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K],
+ reduceFunc: (Vector, Vector) => Vector): DrmRddInput[K] = {
+ val a = srcA.toDrmRdd()
+ val b = srcB.toDrmRdd()
+ val rdd = a
+ .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+ .map({
+ case (key, (vectorSeqA, vectorSeqB)) =>
+ key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
+ })
new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
}
- /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
- def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
- DrmRddInput[K] = {
-
- val ewOps = getEWOps()
- val opId = op.op
-
- val reduceFunc = opId match {
- case "+" => ewOps.plusScalar
- case "-" => ewOps.minusScalar
- case "*" => ewOps.timesScalar
- case "/" => ewOps.divScalar
- case "-:" => ewOps.scalarMinus
- case "/:" => ewOps.scalarDiv
- case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
- }
+ private[blas] def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double,
+ reduceFunc: (Matrix, Double) => Matrix): DrmRddInput[K] = {
val a = srcA.toBlockifiedDrmRdd()
val rdd = a
.map({
@@ -116,55 +92,6 @@ object AewB {
})
new DrmRddInput[K](blockifiedSrc = Some(rdd))
}
-}
-
-trait EWOps {
-
- val plus: ReduceFunc
- val minus: ReduceFunc
- val hadamard: ReduceFunc
-
- val eldiv: ReduceFunc
-
- val plusScalar: ReduceFuncScalar
-
- val minusScalar: ReduceFuncScalar
-
- val scalarMinus: ReduceFuncScalar
-
- val timesScalar: ReduceFuncScalar
-
- val divScalar: ReduceFuncScalar
-
- val scalarDiv: ReduceFuncScalar
-
-}
-
-object InplaceEWOps extends EWOps {
- val plus: ReduceFunc = (a, b) => a += b
- val minus: ReduceFunc = (a, b) => a -= b
- val hadamard: ReduceFunc = (a, b) => a *= b
- val eldiv: ReduceFunc = (a, b) => a /= b
- val plusScalar: ReduceFuncScalar = (A, s) => A += s
- val minusScalar: ReduceFuncScalar = (A, s) => A -= s
- val scalarMinus: ReduceFuncScalar = (A, s) => s -=: A
- val timesScalar: ReduceFuncScalar = (A, s) => A *= s
- val divScalar: ReduceFuncScalar = (A, s) => A /= s
- val scalarDiv: ReduceFuncScalar = (A, s) => s /=: A
}
-
-object CloningEWOps extends EWOps {
- val plus: ReduceFunc = (a, b) => a + b
- val minus: ReduceFunc = (a, b) => a - b
- val hadamard: ReduceFunc = (a, b) => a * b
- val eldiv: ReduceFunc = (a, b) => a / b
- val plusScalar: ReduceFuncScalar = (A, s) => A + s
- val minusScalar: ReduceFuncScalar = (A, s) => A - s
- val scalarMinus: ReduceFuncScalar = (A, s) => s -: A
- val timesScalar: ReduceFuncScalar = (A, s) => A * s
- val divScalar: ReduceFuncScalar = (A, s) => A / s
- val scalarDiv: ReduceFuncScalar = (A, s) => s /: A
-}
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 1e2028d..2d80fe3 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -19,9 +19,8 @@ package org.apache.mahout.sparkbindings.drm
import org.apache.mahout.math._
import math._
-import scalabindings._
+import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import drm._
import scala.collection.JavaConversions._
import org.apache.spark.storage.StorageLevel
import reflect._
@@ -96,9 +95,8 @@ class CheckpointedDrmSpark[K: ClassTag](
val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
- val cols = ncol
- val rows = safeToNonNegInt(nrow)
-
+ val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+ val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
// since currently spark #collect() requires Serializeable support,
// we serialize DRM vectors into byte arrays on backend and restore Vector
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 37a9ac2..322d67c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -23,14 +23,13 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
import org.apache.log4j.Logger
import java.lang.Math
-import org.apache.spark.rdd.{FilteredRDD, RDD}
+import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.broadcast.Broadcast
import org.apache.mahout.math.drm._
import SparkContext._
-import org.apache.mahout.math
package object drm {
@@ -57,9 +56,8 @@ package object drm {
rdd.mapPartitions(iter => {
- if (iter.isEmpty) {
- Iterator.empty
- } else {
+ if (!iter.hasNext) Iterator.empty
+ else {
val data = iter.toIterable
val keys = data.map(t => t._1).toArray[K]
@@ -72,34 +70,24 @@ package object drm {
})
}
- /** Performs rbind() on all blocks inside same partition to ensure there's only one block here. */
- private[sparkbindings] def rbind[K: ClassTag](rdd: BlockifiedDrmRdd[K]): BlockifiedDrmRdd[K] =
- rdd.mapPartitions(iter => {
- if (iter.isEmpty) {
- Iterator.empty
- } else {
- Iterator(math.drm.rbind(iter.toIterable))
- }
- })
-
private[sparkbindings] def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
// Just flat-map rows, connect with the keys
- rdd.flatMap {
+ rdd.flatMap({
case (blockKeys: Array[K], block: Matrix) =>
blockKeys.ensuring(blockKeys.size == block.nrow)
- blockKeys.view.zipWithIndex.map {
+ blockKeys.view.zipWithIndex.map({
case (key, idx) =>
- var v = block(idx, ::) // This is just a view!
+ var v = block(idx, ::)
// If a view rather than a concrete vector, clone into a concrete vector in order not to
// attempt to serialize outer matrix when we save it (Although maybe most often this
// copying is excessive?)
// if (v.isInstanceOf[MatrixVectorView]) v = v.cloned
key -> v
- }
- }
+ })
+ })
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c4ef0d3..34d81cf 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
import org.apache.hadoop.io.Writable
-import org.apache.spark.storage.StorageLevel
/** Public api for Spark-specific operators */
package object sparkbindings {
@@ -180,8 +179,7 @@ package object sparkbindings {
new CheckpointedDrmSpark[K](
rdd = rdd,
_nrow = nrow,
- _ncol = ncol,
- _cacheStorageLevel = StorageLevel.NONE
+ _ncol = ncol
)
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 52e2b35..0834145 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -33,15 +33,13 @@ class ABtSuite extends FunSuite with MahoutLocalContext {
test("ABt") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
val inCoreB = dense((3, 4, 5), (5, 6, 7))
- val A = drmParallelize(m = inCoreA, numPartitions = 3)
- val B = drmParallelize(m = inCoreB, numPartitions = 2)
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+ val B = drmParallelize(m = inCoreB)
val op = new OpABt(A, B)
val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
- printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
-
val inCoreMControl = inCoreA %*% inCoreB.t
val inCoreM = drm.collect
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 389ef65..2efa5ff 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -36,9 +36,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val A = drmParallelize(m = inCoreA, numPartitions = 2)
val B = drmParallelize(m = inCoreB)
- val op = new OpAewB(A, B, "*")
+ val op = new OpAewB(A, B, '*')
- val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA * inCoreB
@@ -53,9 +53,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val A = drmParallelize(m = inCoreA, numPartitions = 2)
val B = drmParallelize(m = inCoreB)
- val op = new OpAewB(A, B, "+")
+ val op = new OpAewB(A, B, '+')
- val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA + inCoreB
@@ -70,9 +70,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val A = drmParallelize(m = inCoreA, numPartitions = 2)
val B = drmParallelize(m = inCoreB)
- val op = new OpAewB(A, B, "-")
+ val op = new OpAewB(A, B, '-')
- val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA - inCoreB
@@ -87,9 +87,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val A = drmParallelize(m = inCoreA, numPartitions = 2)
val B = drmParallelize(m = inCoreB)
- val op = new OpAewB(A, B, "/")
+ val op = new OpAewB(A, B, '/')
- val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA / inCoreB
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
new file mode 100644
index 0000000..2369441
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm.decompositions
+
+import org.scalatest.{Matchers, FunSuite}
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.RandomUtils
+
+class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
+
+ test("thin distributed qr") {
+
+ val inCoreA = dense(
+ (1, 2, 3, 4),
+ (2, 3, 4, 5),
+ (3, -4, 5, 6),
+ (4, 5, 6, 7),
+ (8, 6, 7, 8)
+ )
+
+ val A = drmParallelize(inCoreA, numPartitions = 2)
+ val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
+
+ // Assert optimizer still knows Q and A are identically partitioned
+ drmQ.partitioningTag should equal (A.partitioningTag)
+
+ drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
+
+ // Should also be zippable
+ drmQ.rdd.zip(other = A.rdd)
+
+ val inCoreQ = drmQ.collect
+
+ printf("A=\n%s\n", inCoreA)
+ printf("Q=\n%s\n", inCoreQ)
+ printf("R=\n%s\n", inCoreR)
+
+ val (qControl, rControl) = qr(inCoreA)
+ printf("qControl=\n%s\n", qControl)
+ printf("rControl=\n%s\n", rControl)
+
+ // Validate with Cholesky
+ val ch = chol(inCoreA.t %*% inCoreA)
+ printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
+ printf("L:\n%s\n", ch.getL)
+
+ val rControl2 = (ch.getL cloned).t
+ val qControl2 = ch.solveRight(inCoreA)
+ printf("qControl2=\n%s\n", qControl2)
+ printf("rControl2=\n%s\n", rControl2)
+
+ // Housholder approach seems to be a little bit more stable
+ (rControl - inCoreR).norm should be < 1E-5
+ (qControl - inCoreQ).norm should be < 1E-5
+
+ // Assert identicity with in-core Cholesky-based -- this should be tighter.
+ (rControl2 - inCoreR).norm should be < 1E-10
+ (qControl2 - inCoreQ).norm should be < 1E-10
+
+ // Assert orhtogonality:
+ // (a) Q[,j] dot Q[,j] == 1.0 for all j
+ // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
+ for (col <- 0 until inCoreQ.ncol)
+ ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
+ for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
+ (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
+
+
+ }
+
+ test("dssvd - the naive-est - q=0") {
+ dssvdNaive(q = 0)
+ }
+
+ test("ddsvd - naive - q=1") {
+ dssvdNaive(q = 1)
+ }
+
+ test("ddsvd - naive - q=2") {
+ dssvdNaive(q = 2)
+ }
+
+
+ def dssvdNaive(q: Int) {
+ val inCoreA = dense(
+ (1, 2, 3, 4),
+ (2, 3, 4, 5),
+ (3, -4, 5, 6),
+ (4, 5, 6, 7),
+ (8, 6, 7, 8)
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
+ val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
+
+ printf("U:\n%s\n", inCoreU)
+ printf("V:\n%s\n", inCoreV)
+ printf("Sigma:\n%s\n", s)
+
+ (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
+ }
+
+ test("dspca") {
+
+ import math._
+
+ val rnd = RandomUtils.getRandom
+
+ // Number of points
+ val m = 500
+ // Length of actual spectrum
+ val spectrumLen = 40
+
+ val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+ printf("spectrum:%s\n", spectrum)
+
+ val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+ ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+ // PCA Rotation matrix -- should also be orthonormal.
+ val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+ val input = (u %*%: diagv(spectrum)) %*% tr.t
+ val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+ // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+ // specific additions.
+ val k = 10
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
+ // Un-normalized pca data:
+ drmPCA = drmPCA %*% diagv(s)
+
+ val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+ // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+ // generated input was not centered. So here, we'd just brute-solve pca to verify
+ val xi = input.colMeans()
+ for (r <- 0 until input.nrow) input(r, ::) -= xi
+ var (pcaControl, _, sControl) = svd(m = input)
+ pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+ printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+ printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+ (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 6152426..ac46a9e 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.mahout.sparkbindings.drm
import org.scalatest.{Matchers, FunSuite}
import org.apache.mahout.math._
-import decompositions._
import scalabindings._
import drm._
import RLikeOps._
@@ -31,7 +30,6 @@ import org.apache.mahout.math.Matrices
import org.apache.mahout.sparkbindings.{SparkEngine, blas}
import org.apache.spark.storage.StorageLevel
import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
-import scala.util.Random
/** R-like DRM DSL operation tests */
class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -313,41 +311,6 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
(inCoreC - inCoreCControl).norm should be < 1E-10
}
- test("C = A + B, identically partitioned") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
-
- printf("A.nrow=%d.\n",A.rdd.count())
-
- // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
- val B = A.mapBlock() {
- case (keys, block) =>
- val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble())
- keys -> bBlock
- }
- // Prevent repeated computation non-determinism
- .checkpoint()
-
- val inCoreB = B.collect
-
- printf("A=\n%s\n", inCoreA)
- printf("B=\n%s\n", inCoreB)
-
- val C = A + B
-
- val inCoreC = C.collect
-
- printf("C=\n%s\n", inCoreC)
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- }
-
-
test("C = A + B side test 1") {
val inCoreA = dense((1, 2), (3, 4))
http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index d9e89bc..2d9ed76 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -14,7 +14,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
override protected def beforeEach() {
super.beforeEach()
- mahoutCtx = mahoutSparkContext(masterUrl = "local[2]",
+ mahoutCtx = mahoutSparkContext(masterUrl = "local[3]",
appName = "MahoutLocalContext",
// Do not run MAHOUT_HOME jars in unit tests.
addMahoutJars = false,