You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/06/04 23:34:04 UTC
[2/2] git commit: MAHOUT-1566: (Experimental) Regular ALS factorizer
with conversion tests, optimizer enhancements and bug fixes.
MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes.
This closes apache/mahout#6. (Re-commit).
Squashed commit of the following:
commit 32d5183543ee032770bfcebe0a5b2e22b3a388e3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Wed Jun 4 12:31:57 2014 -0700
removing commented-out section
commit 8e14bf0cf44a5ee86b0c137e8fbb0a8e75bf6ca8
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue Jun 3 13:44:01 2014 -0700
parameter doc
commit cfa207b1bf98fd941b4d0d48baab4ac35b0deb03
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue Jun 3 13:42:12 2014 -0700
+License on DQR
commit d9249968cd444ab90d92fa5197a5893f32e8c5a8
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue Jun 3 11:42:18 2014 -0700
Adding regularization parameter. Refactoring decompositions and their tests into single package (for both distributed and non-distributed stuff).
commit f8ef52fe33dcf6259156ed913ac86dbad6d1a12f
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue Jun 3 11:05:52 2014 -0700
flipping elementwise operator to OK in-place computation by default (back to where it were). For now. See if the problem surfaces frequent enough to switch to cloned computations.
commit dce371fbdbd17b9c1a3c79260dc8e1180e6073ee
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jun 2 18:48:33 2014 -0700
Switching elemnetwise operators to non-inplace execution by default. This will create GC overhead but it is more benign for side effects.
commit c92a6cca9eea1c9d15c5fee9c29c79ef9ed3c77f
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jun 2 17:53:41 2014 -0700
Fixing bug. Non-idempotent in-place elementwise computations are still an issue
commit 775490da0f28887350c8725909d4af24e8f37310
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jun 2 15:17:11 2014 -0700
WIP; bug in zipping -- does several rounds over partitions causing unidempotant side effects.
commit 565ecb0ca2bb6b1dda30af2c81bdeb1be7fae9fe
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 30 18:11:42 2014 -0700
stopping computation on negative convergence too (i guess in ideal data it converges too fast and rounding errors could push rmse back up.)
commit 2c9ba249e7dc312c2919a60d43e781eae67c51a9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 30 17:50:36 2014 -0700
license
commit 4b525e5011c002bdb4eb37e27699dc1f89f63d63
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 30 17:43:31 2014 -0700
print out iteration rmses in the test
commit 84949a533270cb2bfe735f85321706a424a950cf
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 30 17:36:00 2014 -0700
Bug fixes, test ok
commit c47995e74dce76e74bc4a58dbd473120da407dc0
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Thu May 29 21:32:23 2014 -0700
ALS
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9cf90546
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9cf90546
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9cf90546
Branch: refs/heads/master
Commit: 9cf90546dfa8bf11c99089a12d70b0272e7da211
Parents: 26d3db6
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Wed Jun 4 14:29:34 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Wed Jun 4 14:29:34 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/mahout/math/decompositions/ALS.scala | 134 ++++++++++++
.../apache/mahout/math/decompositions/DQR.scala | 74 +++++++
.../mahout/math/decompositions/DSPCA.scala | 153 +++++++++++++
.../mahout/math/decompositions/DSSVD.scala | 82 +++++++
.../mahout/math/decompositions/SSVD.scala | 165 +++++++++++++++
.../mahout/math/decompositions/package.scala | 127 +++++++++++
.../mahout/math/drm/CheckpointedOps.scala | 3 +-
.../mahout/math/drm/DistributedEngine.scala | 11 +-
.../org/apache/mahout/math/drm/DrmLike.scala | 2 +-
.../org/apache/mahout/math/drm/DrmLikeOps.scala | 9 +-
.../apache/mahout/math/drm/RLikeDrmOps.scala | 9 +-
.../mahout/math/drm/decompositions/DQR.scala | 57 -----
.../mahout/math/drm/decompositions/DSPCA.scala | 153 -------------
.../mahout/math/drm/decompositions/DSSVD.scala | 83 --------
.../apache/mahout/math/drm/logical/OpAewB.scala | 9 +-
.../mahout/math/drm/logical/OpMapBlock.scala | 8 +-
.../org/apache/mahout/math/drm/package.scala | 79 +++----
.../mahout/math/scalabindings/MatrixOps.scala | 2 +
.../math/scalabindings/RLikeMatrixOps.scala | 12 +-
.../scalabindings/decompositions/SSVD.scala | 165 ---------------
.../mahout/math/scalabindings/package.scala | 21 --
.../decompositions/DecompositionsSuite.scala | 113 ++++++++++
.../mahout/math/scalabindings/MathSuite.scala | 76 -------
.../mahout/sparkbindings/SparkEngine.scala | 35 +--
.../apache/mahout/sparkbindings/blas/ABt.scala | 35 ++-
.../apache/mahout/sparkbindings/blas/AewB.scala | 159 ++++++++++----
.../drm/CheckpointedDrmSpark.scala | 8 +-
.../mahout/sparkbindings/drm/package.scala | 28 ++-
.../apache/mahout/sparkbindings/package.scala | 4 +-
.../mahout/math/decompositions/MathSuite.scala | 212 +++++++++++++++++++
.../mahout/sparkbindings/blas/ABtSuite.scala | 6 +-
.../mahout/sparkbindings/blas/AewBSuite.scala | 16 +-
.../decompositions/MathSuite.scala | 176 ---------------
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 37 ++++
.../sparkbindings/test/MahoutLocalContext.scala | 2 +-
36 files changed, 1380 insertions(+), 887 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bea124a..3b2e61b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes (dlyubimov)
+
MAHOUT-1537: Minor fixes to spark-shell (Anand Avati via dlyubimov)
MAHOUT-1529: Finalize abstraction of distributed logical plans from backend operations (dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
new file mode 100644
index 0000000..eaefe0e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeDrmOps._
+import RLikeOps._
+import scala.util.Random
+import org.apache.log4j.Logger
+import math._
+
+/** Simple ALS factorization algotithm. To solve, use train() method. */
+object ALS {
+
+ private val log = Logger.getLogger(ALS.getClass)
+
+ /**
+ * ALS training result. <P>
+ *
+ * <code>drmU %*% drmV.t</code> is supposed to approximate the input.
+ *
+ * @param drmU U matrix
+ * @param drmV V matrix
+ * @param iterationsRMSE RMSE values afeter each of iteration performed
+ */
+ class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
+ def toTuple = (drmU, drmV, iterationsRMSE)
+ }
+
+ /**
+ * Run ALS.
+ * <P>
+ *
+ * Example:
+ *
+ * <pre>
+ * val (u,v,errors) = als(input, k).toTuple
+ * </pre>
+ *
+ * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+ * whichever earlier.
+ * <P>
+ *
+ * @param drmInput The input matrix
+ * @param k required rank of decomposition (number of cols in U and V results)
+ * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+ * value. If <=0 then we won't compute RMSE and use convergence test.
+ * @param lambda regularization rate
+ * @param maxIterations maximum iterations to run regardless of convergence
+ * @tparam K row key type of the input (100 is probably more than enough)
+ * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+ */
+ def als[K: ClassTag](
+ drmInput: DrmLike[K],
+ k: Int = 50,
+ lambda: Double = 0.0,
+ maxIterations: Int = 10,
+ convergenceThreshold: Double = 0.10
+ ): Result[K] = {
+
+ assert(convergenceThreshold < 1.0, "convergenceThreshold")
+ assert(maxIterations >= 1, "maxIterations")
+
+ val drmA = drmInput
+ val drmAt = drmInput.t
+
+ // Initialize U and V so that they are identically distributed to A or A'
+ var drmU = drmA.mapBlock(ncol = k) {
+ case (keys, block) =>
+ val uBlock = Matrices.symmetricUniformView(block.nrow, k, Random.nextInt()) * 0.01
+ keys -> uBlock
+ }
+
+ var drmV: DrmLike[Int] = null
+ var rmseIterations: List[Double] = Nil
+
+ // ALS iterator
+ var stop = false
+ var i = 0
+ while (!stop && i < maxIterations) {
+
+ // Alternate. This is really what ALS is.
+ if ( drmV != null) drmV.uncache()
+ drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint()
+
+ drmU.uncache()
+ drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint()
+
+ // Check if we are requested to do a convergence test; and do it if yes.
+ if (convergenceThreshold > 0) {
+
+ val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow)
+
+ if (i > 0) {
+ val rmsePrev = rmseIterations.last
+ val convergence = (rmsePrev - rmse) / rmsePrev
+
+ if (convergence < 0) {
+ log.warn("Rmse increase of %f. Should not happen.".format(convergence))
+ // I guess error growth can happen in ideal data case?
+ stop = true
+ } else if (convergence < convergenceThreshold) {
+ stop = true
+ }
+ }
+ rmseIterations :+= rmse
+ }
+
+ i += 1
+ }
+
+ new Result(drmU, drmV, rmseIterations)
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
new file mode 100644
index 0000000..4ca99b1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+ private val log = Logger.getLogger(DQR.getClass)
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+ if (A.ncol > 5000)
+ log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+ implicit val ctx = A.context
+
+ val AtA = (A.t %*% A).checkpoint()
+ val inCoreAtA = AtA.collect
+
+ if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+ val ch = chol(inCoreAtA)
+ val inCoreR = (ch.getL cloned) t
+
+ if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+ if (checkRankDeficiency && !ch.isPositiveDefinite)
+ throw new IllegalArgumentException("R is rank-deficient.")
+
+ val bcastAtA = drmBroadcast(inCoreAtA)
+
+ // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+ // decompose A'A in the backend again.
+
+ // Compute Q = A*inv(L') -- we can do it blockwise.
+ val Q = A.mapBlock() {
+ case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+ }
+
+ Q -> inCoreR
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
new file mode 100644
index 0000000..37c218a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+ implicit val ctx = A.context
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // Dataset mean
+ val xi = drmA.colMeans
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+ val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+ // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+ // memory beyond that is required to keep xi around, it still might be parallelized to backs
+ // for significantly big n and r. TODO
+ val s_o = omega.t %*% xi
+
+ val bcastS_o = drmBroadcast(s_o)
+ val bcastXi = drmBroadcast(xi)
+
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val s_o:Vector = bcastS_o
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+ keys -> blockY
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ var s_q = drmQ.colSums()
+ var bcastVarS_q = drmBroadcast(s_q)
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ var s_b = (drmBt.t %*% xi).collect(::, 0)
+ var bcastVarS_b = drmBroadcast(s_b)
+
+ for (i <- 0 until q) {
+
+ // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+ // attributes correctly. So we create additional set of vals for broadcast vars to properly
+ // create readonly closure attributes in this very scope.
+ val bcastS_q = bcastVarS_q
+ val bcastS_b = bcastVarS_b
+ val bcastXib = bcastXi
+
+ // Fix Bt as B' -= xi cross s_q
+ drmBt = drmBt.mapBlock() {
+ case (keys, block) =>
+ val s_q: Vector = bcastS_q
+ val xi: Vector = bcastXib
+ keys.zipWithIndex.foreach {
+ case (key, idx) => block(idx, ::) -= s_q * xi(key)
+ }
+ keys -> block
+ }
+
+ drmY.uncache()
+ drmQ.uncache()
+
+ drmY = (drmA %*% drmBt)
+ // Fix Y by subtracting s_b from each row of the AB'
+ .mapBlock() {
+ case (keys, block) =>
+ val s_b: Vector = bcastS_b
+ for (row <- 0 until block.nrow) block(row, ::) -= s_b
+ keys -> block
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ s_q = drmQ.colSums()
+ bcastVarS_q = drmBroadcast(s_q)
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ s_b = (drmBt.t %*% xi).collect(::, 0)
+ bcastVarS_b = drmBroadcast(s_b)
+ }
+
+ val c = s_q cross s_b
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+ c - c.t + (s_q cross s_q) * (xi dot xi)
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
new file mode 100644
index 0000000..a158390
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
@@ -0,0 +1,82 @@
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrix, Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+
+ // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+ // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+ // is much more compact.
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ keys -> blockY
+ }
+
+ var drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (q == 0) drmQ = drmQ.checkpoint()
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (q == 0) drmBt = drmBt.checkpoint()
+
+ for (i <- 0 until q) {
+ drmY = drmA %*% drmBt
+ drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (i == q - 1) drmQ = drmQ.checkpoint()
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (i == q - 1) drmBt = drmBt.checkpoint()
+ }
+
+ val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+ private val log = Logger.getLogger(SSVD.getClass)
+
+ /**
+ * In-core SSVD algorithm.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ var y = a %*% omega
+ var yty = y.t %*% y
+ val at = a.t
+ var ch = chol(yty)
+ assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+ var bt = ch.solveRight(at %*% y)
+
+ // Power iterations
+ for (i <- 0 until q) {
+ y = a %*% bt
+ yty = y.t %*% y
+ ch = chol(yty)
+ bt = ch.solveRight(at %*% y)
+ }
+
+ val bbt = bt.t %*% bt
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = ch.solveRight(y) %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+ }
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ // Dataset mean
+ val xi = a.colMeans()
+
+ if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+ var y = a %*% omega
+
+ // Fixing y
+ val s_o = omega.t %*% xi
+ y := ((r,c,v) => v - s_o(c))
+
+ var yty = y.t %*% y
+ var ch = chol(yty)
+// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+ // This is implicit Q of QR(Y)
+ var qm = ch.solveRight(y)
+ var bt = a.t %*% qm
+ var s_q = qm.colSums()
+ var s_b = bt.t %*% xi
+
+ // Power iterations
+ for (i <- 0 until q) {
+
+ // Fix bt
+ bt -= xi cross s_q
+
+ y = a %*% bt
+
+ // Fix Y again.
+ y := ((r,c,v) => v - s_b(c))
+
+ yty = y.t %*% y
+ ch = chol(yty)
+ qm = ch.solveRight(y)
+ bt = a.t %*% qm
+ s_q = qm.colSums()
+ s_b = bt.t %*% xi
+ }
+
+ val c = s_q cross s_b
+
+ // BB' computation becomes
+ val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
+
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = qm %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
new file mode 100644
index 0000000..852a977
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/**
+ * This package holds all decomposition and factorization-like methods, all that we were able to make
+ * distributed engine-independent so far, anyway.
+ */
+package object decompositions {
+
+ // ================ In-core decompositions ===================
+
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
+ SSVD.spca(a = a, k = k, p = p, q = q)
+
+
+ // ============== Distributed decompositions ===================
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+ DQR.dqrThin(A, checkRankDeficiency)
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+ /**
+ * Run ALS.
+ * <P>
+ *
+ * Example:
+ *
+ * <pre>
+ * val (u,v,errors) = als(input, k).toTuple
+ * </pre>
+ *
+ * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+ * whichever earlier.
+ * <P>
+ *
+ * @param drmInput The input matrix
+ * @param k required rank of decomposition (number of cols in U and V results)
+ * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+ * value. If <=0 then we won't compute RMSE and use convergence test.
+ * @param lambda regularization rate
+ * @param maxIterations maximum iterations to run regardless of convergence
+ * @tparam K row key type of the input (100 is probably more than enough)
+ * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+ */
+ def als[K: ClassTag](
+ drmInput: DrmLike[K],
+ k: Int = 50,
+ lambda: Double = 0.0,
+ maxIterations: Int = 10,
+ convergenceThreshold: Double = 0.10
+ ): ALS.Result[K] =
+ ALS.als(drmInput, k, lambda, maxIterations, convergenceThreshold)
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index fa1ccfd..edd0cfc 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.math.drm
import scala.reflect.ClassTag
-import org.apache.mahout.math.Vector
+import org.apache.mahout.math._
/**
@@ -35,5 +35,6 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
/** Column Means */
def colMeans(): Vector = drm.context.colMeans(drm)
+ def norm():Double = drm.context.norm(drm)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 0e76d87..5ffee9d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,10 +19,11 @@ package org.apache.mahout.math.drm
import scala.reflect.ClassTag
import logical._
-import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
import DistributedEngine._
import org.apache.mahout.math.scalabindings._
-import RLikeOps._
/** Abstraction of optimizer/distributed engine */
trait DistributedEngine {
@@ -43,10 +44,12 @@ trait DistributedEngine {
def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
/** Engine-specific colSums implementation based on a checkpoint. */
- def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+ def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
/** Engine-specific colMeans implementation based on a checkpoint. */
- def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+ def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+ def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
/** Broadcast support */
def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 8e0db1e..7fbfc12 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -28,7 +28,7 @@ package org.apache.mahout.math.drm
*/
trait DrmLike[K] {
- protected[mahout] def partitioningTag:Long
+ protected[mahout] def partitioningTag: Long
protected[mahout] val context:DistributedContext
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 35e28af..328805a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -36,9 +36,14 @@ class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
* @tparam R
* @return
*/
- def mapBlock[R : ClassTag](ncol: Int = -1)
+ def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
(bmf: BlockMapFunc[K, R]): DrmLike[R] =
- new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+ new OpMapBlock[K, R](
+ A = drm,
+ bmf = bmf,
+ _ncol = ncol,
+ identicallyPartitioned = identicallyParitioned
+ )
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index f46d15c..7ac5577 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,13 +25,13 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
import RLikeDrmOps._
- def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+ def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
- def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+ def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
- def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+ def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
- def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+ def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
@@ -70,7 +70,6 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
-
}
object RLikeDrmOps {
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
deleted file mode 100644
index 34ae345..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
- private val log = Logger.getLogger(DQR.getClass)
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
- if (A.ncol > 5000)
- log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
- implicit val ctx = A.context
-
- val AtA = (A.t %*% A).checkpoint()
- val inCoreAtA = AtA.collect
-
- if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
- val ch = chol(inCoreAtA)
- val inCoreR = (ch.getL cloned) t
-
- if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
- if (checkRankDeficiency && !ch.isPositiveDefinite)
- throw new IllegalArgumentException("R is rank-deficient.")
-
- val bcastAtA = drmBroadcast(inCoreAtA)
-
- // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
- // decompose A'A in the backend again.
-
- // Compute Q = A*inv(L') -- we can do it blockwise.
- val Q = A.mapBlock() {
- case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
- }
-
- Q -> inCoreR
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
deleted file mode 100644
index 9e33416..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
- implicit val ctx = A.context
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // Dataset mean
- val xi = drmA.colMeans
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
- val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
- // This done in front in a single-threaded fashion for now. Even though it doesn't require any
- // memory beyond that is required to keep xi around, it still might be parallelized to backs
- // for significantly big n and r. TODO
- val s_o = omega.t %*% xi
-
- val bcastS_o = drmBroadcast(s_o)
- val bcastXi = drmBroadcast(xi)
-
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val s_o:Vector = bcastS_o
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
- keys -> blockY
- }
- // Checkpoint Y
- .checkpoint()
-
- var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- var s_q = drmQ.colSums()
- var bcastVarS_q = drmBroadcast(s_q)
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = (drmA.t %*% drmQ).checkpoint()
-
- var s_b = (drmBt.t %*% xi).collect(::, 0)
- var bcastVarS_b = drmBroadcast(s_b)
-
- for (i <- 0 until q) {
-
- // These closures don't seem to live well with outside-scope vars. This doesn't record closure
- // attributes correctly. So we create additional set of vals for broadcast vars to properly
- // create readonly closure attributes in this very scope.
- val bcastS_q = bcastVarS_q
- val bcastS_b = bcastVarS_b
- val bcastXib = bcastXi
-
- // Fix Bt as B' -= xi cross s_q
- drmBt = drmBt.mapBlock() {
- case (keys, block) =>
- val s_q: Vector = bcastS_q
- val xi: Vector = bcastXib
- keys.zipWithIndex.foreach {
- case (key, idx) => block(idx, ::) -= s_q * xi(key)
- }
- keys -> block
- }
-
- drmY.uncache()
- drmQ.uncache()
-
- drmY = (drmA %*% drmBt)
- // Fix Y by subtracting s_b from each row of the AB'
- .mapBlock() {
- case (keys, block) =>
- val s_b: Vector = bcastS_b
- for (row <- 0 until block.nrow) block(row, ::) -= s_b
- keys -> block
- }
- // Checkpoint Y
- .checkpoint()
-
- drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- s_q = drmQ.colSums()
- bcastVarS_q = drmBroadcast(s_q)
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = (drmA.t %*% drmQ).checkpoint()
-
- s_b = (drmBt.t %*% xi).collect(::, 0)
- bcastVarS_b = drmBroadcast(s_b)
- }
-
- val c = s_q cross s_b
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
- c - c.t + (s_q cross s_q) * (xi dot xi)
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
deleted file mode 100644
index 0da9ec7..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
-
- // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
- // instantiate the Omega random matrix view in the backend instead. That way serialized closure
- // is much more compact.
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- keys -> blockY
- }
-
- var drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (q == 0) drmQ = drmQ.checkpoint()
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (q == 0) drmBt = drmBt.checkpoint()
-
- for (i <- 0 until q) {
- drmY = drmA %*% drmBt
- drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (i == q - 1) drmQ = drmQ.checkpoint()
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (i == q - 1) drmBt = drmBt.checkpoint()
- }
-
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index d07172a..da7c0d5 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -19,17 +19,24 @@ package org.apache.mahout.math.drm.logical
import scala.reflect.ClassTag
import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
/** DRM elementwise operator */
case class OpAewB[K: ClassTag](
override var A: DrmLike[K],
override var B: DrmLike[K],
- val op: Char
+ val op: String
) extends AbstractBinaryOp[K, K, K] {
+
+
assert(A.ncol == B.ncol, "arguments must have same number of columns")
assert(A.nrow == B.nrow, "arguments must have same number of rows")
+ override protected[mahout] lazy val partitioningTag: Long =
+ if (A.partitioningTag == B.partitioningTag) A.partitioningTag
+ else Random.nextLong()
+
/** R-like syntax for number of rows. */
def nrow: Long = A.nrow
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 8e4362d..7299d9e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -21,16 +21,18 @@ import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+import scala.util.Random
class OpMapBlock[S: ClassTag, R: ClassTag](
override var A: DrmLike[S],
val bmf: BlockMapFunc[S, R],
val _ncol: Int = -1,
- val _nrow: Long = -1
+ val _nrow: Long = -1,
+ identicallyPartitioned:Boolean
) extends AbstractUnaryOp[S, R] {
-
- override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+ override protected[mahout] lazy val partitioningTag: Long =
+ if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
/** R-like syntax for number of rows. */
def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 768bb1c..02e8b7a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -18,7 +18,9 @@
package org.apache.mahout.math
import scala.reflect.ClassTag
-import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
package object drm {
@@ -70,56 +72,45 @@ package object drm {
(implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
/** Implicit broadcast -> value conversion. */
- implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+ implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
/** Just throw all engine operations into context as well. */
- implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+ implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
new CheckpointedOps[K](drm)
- implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
-
- // ============== Decompositions ===================
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
- DQR.dqrThin(A, checkRankDeficiency)
-
/**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
+ * We assume that whenever computational action is invoked without explicit checkpoint, the user
+ * doesn't imply caching
*/
- def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
-
+ implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
+
+ /** Implicit conversion to in-core with NONE caching of the result. */
+ implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
+
+ /** Do vertical concatenation of collection of blockified tuples */
+ def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
+ assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
+ if (blocks.size == 1) {
+ // No coalescing required.
+ blocks.head
+ } else {
+ // compute total number of rows in a new block
+ val m = blocks.view.map(_._2.nrow).sum
+ val n = blocks.head._2.ncol
+ val coalescedBlock = blocks.head._2.like(m, n)
+ val coalescedKeys = new Array[K](m)
+ var row = 0
+ for (elem <- blocks.view) {
+ val block = elem._2
+ val rowEnd = row + block.nrow
+ coalescedBlock(row until rowEnd, ::) := block
+ elem._1.copyToArray(coalescedKeys, row)
+ row = rowEnd
+ }
+ coalescedKeys -> coalescedBlock
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index a872240..48c2048 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -51,6 +51,8 @@ class MatrixOps(val m: Matrix) {
def -(that: Matrix) = cloned -= that
+ def -:(that: Matrix) = that - m
+
// m.plus(that)?
def +(that: Double) = cloned += that
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
index 23c2ac2..af53c51 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
@@ -20,7 +20,7 @@ import org.apache.mahout.math.{Vector, Matrix}
import scala.collection.JavaConversions._
import RLikeOps._
-class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
+class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
/**
* matrix-matrix multiplication
@@ -47,12 +47,14 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
def *(that: Double) = cloned *= that
- def /(that:Matrix) = cloned /= that
+ def /(that: Matrix) = cloned /= that
- def /(that:Double) = cloned /= that
+ def /:(that: Matrix) = that / m
+
+ def /(that: Double) = cloned /= that
/** 1.0 /: A is eqivalent to R's 1.0/A */
- def /:(that:Double) = that /=: cloned
+ def /:(that: Double) = that /=: cloned
/**
* in-place Hadamard product. We probably don't want to use assign
@@ -82,7 +84,7 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
}
/** 1.0 /=: A is equivalent to A = 1.0/A in R */
- def /=:(that:Double) = {
+ def /=:(that: Double) = {
m.foreach(that /=: _.vector())
m
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
deleted file mode 100644
index 80385a3..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import org.apache.mahout.common.RandomUtils
-import org.apache.log4j.Logger
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-private[math] object SSVD {
-
- private val log = Logger.getLogger(SSVD.getClass)
-
- /**
- * In-core SSVD algorithm.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- var y = a %*% omega
- var yty = y.t %*% y
- val at = a.t
- var ch = chol(yty)
- assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
- var bt = ch.solveRight(at %*% y)
-
- // Power iterations
- for (i <- 0 until q) {
- y = a %*% bt
- yty = y.t %*% y
- ch = chol(yty)
- bt = ch.solveRight(at %*% y)
- }
-
- val bbt = bt.t %*% bt
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = ch.solveRight(y) %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
- }
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- // Dataset mean
- val xi = a.colMeans()
-
- if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
- var y = a %*% omega
-
- // Fixing y
- val s_o = omega.t %*% xi
- y := ((r,c,v) => v - s_o(c))
-
- var yty = y.t %*% y
- var ch = chol(yty)
-// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
- // This is implicit Q of QR(Y)
- var qm = ch.solveRight(y)
- var bt = a.t %*% qm
- var s_q = qm.colSums()
- var s_b = bt.t %*% xi
-
- // Power iterations
- for (i <- 0 until q) {
-
- // Fix bt
- bt -= xi cross s_q
-
- y = a %*% bt
-
- // Fix Y again.
- y := ((r,c,v) => v - s_b(c))
-
- yty = y.t %*% y
- ch = chol(yty)
- qm = ch.solveRight(y)
- bt = a.t %*% qm
- s_q = qm.colSums()
- s_b = bt.t %*% xi
- }
-
- val c = s_q cross s_b
-
- // BB' computation becomes
- val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
-
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = qm %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 4599146..2b7773b 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -281,26 +281,5 @@ package object scalabindings {
x(::, 0)
}
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
- SSVD.spca(a = a, k = k, p = p, q = q)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
new file mode 100644
index 0000000..8f5ec99
--- /dev/null
+++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import org.scalatest.FunSuite
+import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+
+/**
+ * This suite tests only in-core decomposititions.
+ * <P>
+ *
+ * We moved distributed tests into mahout-spark module since they require a concrete distributed
+ * engine dependencies to run.
+ * <P>
+ */
+class DecompositionsSuite extends FunSuite with MahoutSuite {
+
+ test("ssvd") {
+
+ // Very naive, a full-rank only here.
+ val a = dense(
+ (1, 2, 3),
+ (3, 4, 5),
+ (-2, 6, 7),
+ (-3, 8, 9)
+ )
+
+ val rank = 2
+ val (u, v, s) = ssvd(a, k = rank, q = 1)
+
+ val (uControl, vControl, sControl) = svd(a)
+
+ printf("U:\n%s\n", u)
+ printf("U-control:\n%s\n", uControl)
+ printf("V:\n%s\n", v)
+ printf("V-control:\n%s\n", vControl)
+ printf("Sigma:\n%s\n", s)
+ printf("Sigma-control:\n%s\n", sControl)
+
+ (s - sControl(0 until rank)).norm(2) should be < 1E-7
+
+ // Singular vectors may be equivalent down to a sign only.
+ (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
+ (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
+ }
+
+ test("spca") {
+
+ import math._
+
+ val rnd = RandomUtils.getRandom
+
+ // Number of points
+ val m = 500
+ // Length of actual spectrum
+ val spectrumLen = 40
+
+ val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+ printf("spectrum:%s\n", spectrum)
+
+ val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+ ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+ // PCA Rotation matrix -- should also be orthonormal.
+ val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+ val input = (u %*%: diagv(spectrum)) %*% tr.t
+
+ // Calculate just first 10 principal factors and reduce dimensionality.
+ // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+ // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+ // specific additions.
+ val k = 10
+ var (pca, _, s) = spca(a = input, k = k, p = spectrumLen, q = 1)
+ printf("Svs:%s\n", s)
+ // Un-normalized pca data:
+ pca = pca %*%: diagv(s)
+
+ // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+ // generated input was not centered. So here, we'd just brute-solve pca to verify
+ val xi = input.colMeans()
+ for (r <- 0 until input.nrow) input(r, ::) -= xi
+ var (pcaControl, _, sControl) = svd(m = input)
+
+ printf("Svs-control:%s\n", sControl)
+ pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+ printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+ printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+ (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index d1171cc..b10cde3 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -193,82 +193,6 @@ class MathSuite extends FunSuite with MahoutSuite {
}
- test("ssvd") {
- val a = dense(
- (1, 2, 3),
- (3, 4, 5),
- (-2, 6, 7),
- (-3, 8, 9)
- )
-
- val rank = 2
- val (u, v, s) = ssvd(a, k = rank, q = 1)
-
- val (uControl, vControl, sControl) = svd(a)
-
- printf("U:\n%s\n", u)
- printf("U-control:\n%s\n", uControl)
- printf("V:\n%s\n", v)
- printf("V-control:\n%s\n", vControl)
- printf("Sigma:\n%s\n", s)
- printf("Sigma-control:\n%s\n", sControl)
-
- (s - sControl(0 until rank)).norm(2) should be < 1E-7
-
- // Singular vectors may be equivalent down to a sign only.
- (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
- (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
-
- }
-
- test("spca") {
-
- import math._
-
- val rnd = RandomUtils.getRandom
-
- // Number of points
- val m = 500
- // Length of actual spectrum
- val spectrumLen = 40
-
- val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
- printf("spectrum:%s\n", spectrum)
-
- val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
- ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
- // PCA Rotation matrix -- should also be orthonormal.
- val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
- val input = (u %*%: diagv(spectrum)) %*% tr.t
-
- // Calculate just first 10 principal factors and reduce dimensionality.
- // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
- // ensure to zero stochastic error and assert only functional correctness of the method's pca-
- // specific additions.
- val k = 10
- var (pca, _, s) = spca(a = input, k = k, p=spectrumLen, q = 1)
- printf("Svs:%s\n",s)
- // Un-normalized pca data:
- pca = pca %*%: diagv(s)
-
- // Of course, once we calculated the pca, the spectrum is going to be different since our originally
- // generated input was not centered. So here, we'd just brute-solve pca to verify
- val xi = input.colMeans()
- for (r <- 0 until input.nrow) input(r, ::) -= xi
- var (pcaControl, _, sControl) = svd(m = input)
-
- printf("Svs-control:%s\n",sControl)
- pcaControl = (pcaControl %*%: diagv(sControl))(::,0 until k)
-
- printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
- printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
- (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
- }
-
test("random uniform") {
val omega1 = Matrices.symmetricUniformView(2, 3, 1234)
val omega2 = Matrices.symmetricUniformView(2, 3, 1234)
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 0c904ab..3a03e58 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,9 +17,9 @@
package org.apache.mahout.sparkbindings
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
import RLikeOps._
-import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.logical._
import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
import org.apache.mahout.math._
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
import scala.Some
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
/** Spark-specific non-drm-method operations */
object SparkEngine extends DistributedEngine {
@@ -51,7 +53,17 @@ object SparkEngine extends DistributedEngine {
}
/** Engine-specific colMeans implementation based on a checkpoint. */
- def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+ override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+ if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+ override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+ drm.rdd
+ // Compute sum of squares of each vector
+ .map {
+ case (key, v) => v dot v
+ }
+ .reduce(_ + _)
+
/**
* Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -123,7 +135,10 @@ object SparkEngine extends DistributedEngine {
{
implicit def getWritable(x: Any): Writable = val2key()
- new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+ new CheckpointedDrmSpark(
+ rdd = rdd.map(t => (key2val(t._1), t._2)),
+ _cacheStorageLevel = StorageLevel.MEMORY_ONLY
+ )(km.asInstanceOf[ClassTag[Any]])
}
}
@@ -210,16 +225,8 @@ object SparkEngine extends DistributedEngine {
case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
- case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
// Custom operators, we just execute them
http://git-wip-us.apache.org/repos/asf/mahout/blob/9cf90546/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 97873bd..1e3f286 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -65,16 +65,22 @@ object ABt {
// Blockify everything.
val blocksA = srcA.toBlockifiedDrmRdd()
- // mark row-blocks with group id
+ // Mark row-blocks with group id
.mapPartitionsWithIndex((part, iter) => {
- val rowBlockId = part
- val (blockKeys, block) = iter.next()
- // Each partition must have exactly one block due to implementation guarantees of blockify()
- iter.ensuring(!_.hasNext)
+ if (iter.isEmpty) {
+ Iterator.empty
+ } else {
+
+ val rowBlockId = part
+ val (blockKeys, block) = iter.next()
- // the output is (row block id, array of row keys, and the matrix representing the block).
- Iterator((rowBlockId, blockKeys, block))
+ // Each partition must have exactly one block due to implementation guarantees of blockify()
+ assert(!iter.hasNext, "Partition #%d is expected to have at most 1 block at AB'.".format(part))
+
+ // the output is (row block id, array of row keys, and the matrix representing the block).
+ Iterator((rowBlockId, blockKeys, block))
+ }
})
val blocksB = srcB.toBlockifiedDrmRdd()
@@ -96,7 +102,7 @@ object ABt {
// The plan.
- val blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+ var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
// Build Cartesian. It may require a bit more memory there at tasks.
.cartesian(blocksB)
@@ -119,7 +125,7 @@ object ABt {
// Combine -- this is probably the most efficient
.combineByKey[(Array[K],Matrix)](
- createCombiner = (t:(Array[K],Array[Int],Matrix)) => t match {
+ createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
case (rowKeys, colKeys, blockProd) =>
// Accumulator is a row-wise block of sparse vectors.
@@ -148,11 +154,20 @@ object ABt {
mergeCombiners = (c1: (Array[K], Matrix), c2: (Array[K], Matrix)) => {
c1._2 += c2._2
c1
- })
+ },
+
+ // Cartesian will tend to produce much more partitions that we actually need for co-grouping,
+ // and as a result, we may see empty partitions than we actually need.
+ numPartitions = numProductPartitions
+ )
// Combine leaves residual block key -- we don't need that.
.map(_._2)
+ // This may produce more than one block per partition. Most implementation rely on convention of
+ // having at most one block per partition.
+ blockifiedRdd = rbind(blockifiedRdd)
+
new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
}
}