You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/05/27 21:14:53 UTC
[3/3] git commit: MAHOUT-1529 closes PR #1
MAHOUT-1529 closes PR #1
Squashed commit of the following:
commit e7b27280333fe12c94f3f5675c876f56e9e60728
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue May 27 11:01:13 2014 -0700
License, comments
commit bfca581389d91fb9c41db8c49e256ff694335fd1
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 16:07:01 2014 -0700
Fixing shell
commit c3b8aa4bf7d8939aa0800ce12b7f63e01f9686b9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 13:36:20 2014 -0700
- MahoutContext
commit a6461ac22a46793ab0bcdfcafe827d808f9e1810
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 12:13:40 2014 -0700
rebasing changes in MAHOUT-1529 branch on to new git master (applied thru selective patching, may, but hopefully, doesn't revert any commits in between in spark/ and math-scala/ modules
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8714a0f7
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8714a0f7
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8714a0f7
Branch: refs/heads/master
Commit: 8714a0f722663ea5cb16c14c5b8a01e57574cd93
Parents: 5a1c2cc
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Tue May 27 12:13:29 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Tue May 27 12:13:29 2014 -0700
----------------------------------------------------------------------
math-scala/pom.xml | 1 -
.../org/apache/mahout/math/drm/BCast.scala | 23 ++
.../org/apache/mahout/math/drm/CacheHint.scala | 19 ++
.../mahout/math/drm/CheckpointedDrm.scala | 36 +++
.../mahout/math/drm/CheckpointedOps.scala | 39 +++
.../mahout/math/drm/DistributedContext.scala | 27 +++
.../mahout/math/drm/DistributedEngine.scala | 158 ++++++++++++
.../org/apache/mahout/math/drm/DrmLike.scala | 48 ++++
.../org/apache/mahout/math/drm/DrmLikeOps.scala | 84 +++++++
.../apache/mahout/math/drm/RLikeDrmOps.scala | 92 +++++++
.../mahout/math/drm/decompositions/DQR.scala | 57 +++++
.../mahout/math/drm/decompositions/DSPCA.scala | 153 ++++++++++++
.../mahout/math/drm/decompositions/DSSVD.scala | 83 +++++++
.../math/drm/logical/AbstractBinaryOp.scala | 37 +++
.../math/drm/logical/AbstractUnaryOp.scala | 36 +++
.../math/drm/logical/CheckpointAction.scala | 47 ++++
.../apache/mahout/math/drm/logical/OpAB.scala | 41 ++++
.../mahout/math/drm/logical/OpABAnyKey.scala | 41 ++++
.../apache/mahout/math/drm/logical/OpABt.scala | 42 ++++
.../apache/mahout/math/drm/logical/OpAewB.scala | 39 +++
.../mahout/math/drm/logical/OpAewScalar.scala | 38 +++
.../apache/mahout/math/drm/logical/OpAt.scala | 33 +++
.../apache/mahout/math/drm/logical/OpAtA.scala | 36 +++
.../mahout/math/drm/logical/OpAtAnyKey.scala | 34 +++
.../apache/mahout/math/drm/logical/OpAtB.scala | 42 ++++
.../apache/mahout/math/drm/logical/OpAtx.scala | 41 ++++
.../apache/mahout/math/drm/logical/OpAx.scala | 42 ++++
.../mahout/math/drm/logical/OpMapBlock.scala | 41 ++++
.../mahout/math/drm/logical/OpRowRange.scala | 36 +++
.../math/drm/logical/OpTimesLeftMatrix.scala | 43 ++++
.../math/drm/logical/OpTimesRightMatrix.scala | 46 ++++
.../org/apache/mahout/math/drm/package.scala | 125 ++++++++++
.../apache/mahout/math/scalabindings/SSVD.scala | 165 -------------
.../scalabindings/decompositions/SSVD.scala | 165 +++++++++++++
.../mahout/math/scalabindings/package.scala | 1 +
.../sparkbindings/shell/MahoutSparkILoop.scala | 33 ++-
.../mahout/sparkbindings/shell/Main.scala | 17 ++
spark-shell/src/test/mahout/simple.mscala | 29 ++-
spark/pom.xml | 95 ++++----
.../sparkbindings/SparkDistributedContext.scala | 30 +++
.../mahout/sparkbindings/SparkEngine.scala | 240 +++++++++++++++++++
.../apache/mahout/sparkbindings/blas/ABt.scala | 5 +-
.../apache/mahout/sparkbindings/blas/AewB.scala | 2 +-
.../mahout/sparkbindings/blas/AinCoreB.scala | 15 +-
.../apache/mahout/sparkbindings/blas/At.scala | 2 +-
.../apache/mahout/sparkbindings/blas/AtA.scala | 4 +-
.../apache/mahout/sparkbindings/blas/AtB.scala | 3 +-
.../apache/mahout/sparkbindings/blas/Ax.scala | 19 +-
.../mahout/sparkbindings/blas/DrmRddOps.scala | 2 +-
.../mahout/sparkbindings/blas/MapBlock.scala | 43 ++++
.../mahout/sparkbindings/blas/Slicing.scala | 2 +-
.../mahout/sparkbindings/blas/package.scala | 1 -
.../sparkbindings/decompositions/DQR.scala | 56 -----
.../sparkbindings/decompositions/DSPCA.scala | 153 ------------
.../sparkbindings/decompositions/DSSVD.scala | 83 -------
.../mahout/sparkbindings/drm/CacheHint.scala | 20 --
.../sparkbindings/drm/CheckpointedDrm.scala | 39 ---
.../sparkbindings/drm/CheckpointedDrmBase.scala | 161 -------------
.../drm/CheckpointedDrmSpark.scala | 164 +++++++++++++
.../drm/CheckpointedDrmSparkOps.scala | 16 ++
.../sparkbindings/drm/CheckpointedOps.scala | 63 -----
.../mahout/sparkbindings/drm/DrmLike.scala | 46 ----
.../mahout/sparkbindings/drm/DrmLikeOps.scala | 85 -------
.../mahout/sparkbindings/drm/DrmRddInput.scala | 1 +
.../mahout/sparkbindings/drm/RLikeDrmOps.scala | 98 --------
.../mahout/sparkbindings/drm/SparkBCast.scala | 25 ++
.../mahout/sparkbindings/drm/package.scala | 226 ++---------------
.../drm/plan/AbstractBinaryOp.scala | 37 ---
.../drm/plan/AbstractUnaryOp.scala | 34 ---
.../drm/plan/CheckpointAction.scala | 207 ----------------
.../mahout/sparkbindings/drm/plan/OpAB.scala | 43 ----
.../sparkbindings/drm/plan/OpABAnyKey.scala | 41 ----
.../mahout/sparkbindings/drm/plan/OpABt.scala | 43 ----
.../mahout/sparkbindings/drm/plan/OpAewB.scala | 39 ---
.../sparkbindings/drm/plan/OpAewScalar.scala | 38 ---
.../mahout/sparkbindings/drm/plan/OpAt.scala | 34 ---
.../mahout/sparkbindings/drm/plan/OpAtA.scala | 37 ---
.../sparkbindings/drm/plan/OpAtAnyKey.scala | 34 ---
.../mahout/sparkbindings/drm/plan/OpAtB.scala | 42 ----
.../mahout/sparkbindings/drm/plan/OpAtx.scala | 42 ----
.../mahout/sparkbindings/drm/plan/OpAx.scala | 42 ----
.../sparkbindings/drm/plan/OpMapBlock.scala | 58 -----
.../sparkbindings/drm/plan/OpRowRange.scala | 37 ---
.../drm/plan/OpTimesLeftMatrix.scala | 44 ----
.../drm/plan/OpTimesRightMatrix.scala | 46 ----
.../mahout/sparkbindings/drm/plan/package.scala | 24 --
.../io/MahoutKryoRegistrator.scala | 6 +-
.../apache/mahout/sparkbindings/package.scala | 61 ++++-
.../mahout/sparkbindings/blas/ABtSuite.scala | 7 +-
.../mahout/sparkbindings/blas/AewBSuite.scala | 14 +-
.../mahout/sparkbindings/blas/AtASuite.scala | 4 +-
.../mahout/sparkbindings/blas/AtSuite.scala | 12 +-
.../decompositions/MathSuite.scala | 7 +-
.../sparkbindings/drm/DrmLikeOpsSuite.scala | 6 +-
.../mahout/sparkbindings/drm/DrmLikeSuite.scala | 6 +-
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 30 +--
.../sparkbindings/test/MahoutLocalContext.scala | 7 +-
97 files changed, 2607 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 4604b7d..95fe2c7 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -167,7 +167,6 @@
<artifactId>mahout-math</artifactId>
</dependency>
-
<!-- 3rd-party -->
<dependency>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
new file mode 100644
index 0000000..850614457
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+/** Broadcast variable abstraction */
+trait BCast[T] {
+ def value:T
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
new file mode 100644
index 0000000..ac763f9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -0,0 +1,19 @@
+package org.apache.mahout.math.drm
+
+object CacheHint extends Enumeration {
+
+ type CacheHint = Value
+
+ val NONE,
+ DISK_ONLY,
+ DISK_ONLY_2,
+ MEMORY_ONLY,
+ MEMORY_ONLY_2,
+ MEMORY_ONLY_SER,
+ MEMORY_ONLY_SER_2,
+ MEMORY_AND_DISK,
+ MEMORY_AND_DISK_2,
+ MEMORY_AND_DISK_SER,
+ MEMORY_AND_DISK_SER_2 = Value
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
new file mode 100644
index 0000000..0266944
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.math.Matrix
+
+/**
+ * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
+ * therefore collected or saved.
+ * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+ */
+trait CheckpointedDrm[K] extends DrmLike[K] {
+
+ def collect: Matrix
+
+ def writeDRM(path: String)
+
+ /** If this checkpoint is already declared cached, uncache. */
+ def uncache()
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
new file mode 100644
index 0000000..fa1ccfd
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+
+
+/**
+ * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
+ * the DRMBase once they stabilize.
+ *
+ */
+class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+
+
+ /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
+ def colSums(): Vector = drm.context.colSums(drm)
+
+ /** Column Means */
+ def colMeans(): Vector = drm.context.colMeans(drm)
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
new file mode 100644
index 0000000..39bab90
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import java.io.Closeable
+
+/** Distributed context (a.k.a. distributed session handle) */
+trait DistributedContext extends Closeable {
+
+ val engine:DistributedEngine
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
new file mode 100644
index 0000000..0e76d87
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import logical._
+import org.apache.mahout.math.{Matrix, Vector}
+import DistributedEngine._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+/** Abstraction of optimizer/distributed engine */
+trait DistributedEngine {
+
+ /**
+ * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
+ * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
+ * per se.
+ * <P>
+ *
+ * A particular physical engine implementation may choose to either use the default rewrites or
+ * build its own rewriting rules.
+ * <P>
+ */
+ def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
+
+ /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+ def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
+
+ /** Engine-specific colSums implementation based on a checkpoint. */
+ def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+ /** Engine-specific colMeans implementation based on a checkpoint. */
+ def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+ /** Broadcast support */
+ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
+
+ /** Broadcast support */
+ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
+
+ /** Load DRM from hdfs (as in Mahout DRM format) */
+ def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_]
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[String]
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+ /** Creates empty DRM with non-trivial height */
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Long]
+}
+
+object DistributedEngine {
+
+ /** This is mostly multiplication operations rewrites */
+ private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+
+ action match {
+ case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
+ case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+ // For now, rewrite left-multiply via transpositions, i.e.
+ // inCoreA %*% B = (B' %*% inCoreA')'
+ case op@OpTimesLeftMatrix(a, b) =>
+ OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass1(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass1(bop.A)(bop.classTagA)
+ bop.B = pass1(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+ /** This would remove stuff like A.t.t that previous step may have created */
+ private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+ action match {
+ // A.t.t => A
+ case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass2(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass2(bop.A)(bop.classTagA)
+ bop.B = pass2(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+ /** Some further rewrites that are conditioned on A.t.t removal */
+ private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+ action match {
+
+ // matrix products.
+ case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+
+ // AtB cases that make sense.
+ case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
+ case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+
+ // Need some cost to choose between the following.
+
+ case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+ // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
+ case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+ // Rewrite A'x
+ case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass3(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass3(bop.A)(bop.classTagA)
+ bop.B = pass3(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
new file mode 100644
index 0000000..8e0db1e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+
+/**
+ *
+ * Basic spark DRM trait.
+ *
+ * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
+ * this package. Spark backing is already implied.
+ *
+ */
+trait DrmLike[K] {
+
+ protected[mahout] def partitioningTag:Long
+
+ protected[mahout] val context:DistributedContext
+
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long
+
+ /** R-like syntax for number of columns */
+ def ncol: Int
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
new file mode 100644
index 0000000..35e28af
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange}
+
+/** Common Drm ops */
+class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+ /**
+ * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
+ * matrices; or they could be completely new matrices with new keyset. In the latter case, output
+ * matrix width must be specified with <code>ncol</code> parameter.<P>
+ *
+ * New block heights must be of the same height as the original geometry.<P>
+ *
+ * @param ncol new matrix' width (only needed if width changes).
+ * @param bmf
+ * @tparam R
+ * @return
+ */
+ def mapBlock[R : ClassTag](ncol: Int = -1)
+ (bmf: BlockMapFunc[K, R]): DrmLike[R] =
+ new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+
+
+ /**
+ * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
+ *
+ * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
+ *
+ * Row range is currently unsupported except for the all-range. When it will be fully supported,
+ * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
+ *
+ * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
+ * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
+ */
+ def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
+
+ import RLikeDrmOps._
+ import RLikeOps._
+
+ val rowSrc: DrmLike[K] = if (rowRange != ::) {
+
+ if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+
+ assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
+ val intKeyed = drm.asInstanceOf[DrmLike[Int]]
+
+ new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
+
+ } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
+
+ } else drm
+
+ if (colRange != ::) {
+
+ assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
+
+ // Use mapBlock operator to do in-core subranging.
+ rowSrc.mapBlock(ncol = colRange.length)({
+ case (keys, block) => keys -> block(::, colRange)
+ })
+
+ } else rowSrc
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
new file mode 100644
index 0000000..f46d15c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Vector, Matrix}
+import org.apache.mahout.math.drm.logical._
+
+class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+
+ import RLikeDrmOps._
+
+ def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+
+ def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+
+ def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+
+ def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+
+ def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+ def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
+
+ def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
+
+ def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+ def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
+
+ def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
+
+ def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+
+ def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
+
+ def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+
+ def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
+
+ def %*%(that: Matrix): DrmLike[K] = this :%*% that
+
+ def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
+
+ def %*%(that: Vector): DrmLike[K] = :%*%(that)
+
+ def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+}
+
+class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
+
+ override def t: DrmLike[Int] = OpAt(A = drm)
+
+ def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
+
+ def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
+
+}
+
+object RLikeDrmOps {
+ implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
+
+ implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+
+ implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+
+ implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+
+ implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+
+ /**
+ * This is probably dangerous since it triggers implicit checkpointing with default storage level
+ * setting.
+ */
+ implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+ private val log = Logger.getLogger(DQR.getClass)
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+ if (A.ncol > 5000)
+ log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+ implicit val ctx = A.context
+
+ val AtA = (A.t %*% A).checkpoint()
+ val inCoreAtA = AtA.collect
+
+ if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+ val ch = chol(inCoreAtA)
+ val inCoreR = (ch.getL cloned) t
+
+ if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+ if (checkRankDeficiency && !ch.isPositiveDefinite)
+ throw new IllegalArgumentException("R is rank-deficient.")
+
+ val bcastAtA = drmBroadcast(inCoreAtA)
+
+ // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+ // decompose A'A in the backend again.
+
+ // Compute Q = A*inv(L') -- we can do it blockwise.
+ val Q = A.mapBlock() {
+ case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+ }
+
+ Q -> inCoreR
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+ implicit val ctx = A.context
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // Dataset mean
+ val xi = drmA.colMeans
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+ val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+ // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+ // memory beyond that is required to keep xi around, it still might be parallelized to backs
+ // for significantly big n and r. TODO
+ val s_o = omega.t %*% xi
+
+ val bcastS_o = drmBroadcast(s_o)
+ val bcastXi = drmBroadcast(xi)
+
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val s_o:Vector = bcastS_o
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+ keys -> blockY
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ var s_q = drmQ.colSums()
+ var bcastVarS_q = drmBroadcast(s_q)
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ var s_b = (drmBt.t %*% xi).collect(::, 0)
+ var bcastVarS_b = drmBroadcast(s_b)
+
+ for (i <- 0 until q) {
+
+ // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+ // attributes correctly. So we create additional set of vals for broadcast vars to properly
+ // create readonly closure attributes in this very scope.
+ val bcastS_q = bcastVarS_q
+ val bcastS_b = bcastVarS_b
+ val bcastXib = bcastXi
+
+ // Fix Bt as B' -= xi cross s_q
+ drmBt = drmBt.mapBlock() {
+ case (keys, block) =>
+ val s_q: Vector = bcastS_q
+ val xi: Vector = bcastXib
+ keys.zipWithIndex.foreach {
+ case (key, idx) => block(idx, ::) -= s_q * xi(key)
+ }
+ keys -> block
+ }
+
+ drmY.uncache()
+ drmQ.uncache()
+
+ drmY = (drmA %*% drmBt)
+ // Fix Y by subtracting s_b from each row of the AB'
+ .mapBlock() {
+ case (keys, block) =>
+ val s_b: Vector = bcastS_b
+ for (row <- 0 until block.nrow) block(row, ::) -= s_b
+ keys -> block
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ s_q = drmQ.colSums()
+ bcastVarS_q = drmBroadcast(s_q)
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ s_b = (drmBt.t %*% xi).collect(::, 0)
+ bcastVarS_b = drmBroadcast(s_b)
+ }
+
+ val c = s_q cross s_b
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+ c - c.t + (s_q cross s_q) * (xi dot xi)
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+
+ // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+ // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+ // is much more compact.
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ keys -> blockY
+ }
+
+ var drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (q == 0) drmQ = drmQ.checkpoint()
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (q == 0) drmBt = drmBt.checkpoint()
+
+ for (i <- 0 until q) {
+ drmY = drmA %*% drmBt
+ drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (i == q - 1) drmQ = drmQ.checkpoint()
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (i == q - 1) drmBt = drmBt.checkpoint()
+ }
+
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
new file mode 100644
index 0000000..c2371d1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
+ extends CheckpointAction[K] with DrmLike[K] {
+
+ protected[drm] var A: DrmLike[A]
+ protected[drm] var B: DrmLike[B]
+ protected[mahout] lazy val context: DistributedContext = A.context
+
+ // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
+ def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+ def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
+
+ def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
new file mode 100644
index 0000000..eb5ef9a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+/** Abstract unary operator */
+abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
+ extends CheckpointAction[K] with DrmLike[K] {
+
+ protected[drm] var A: DrmLike[A]
+
+ protected[mahout] lazy val context: DistributedContext = A.context
+
+ def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+ def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
new file mode 100644
index 0000000..aa3a3b9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import scala.util.Random
+import org.apache.mahout.math.drm._
+
+/** Implementation of distributed expression checkpoint and optimizer. */
+abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
+
+ protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+ private[mahout] var cp:Option[CheckpointedDrm[K]] = None
+
+ def isIdenticallyPartitioned(other:DrmLike[_]) =
+ partitioningTag!= 0L && partitioningTag == other.partitioningTag
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match {
+ case None =>
+ val physPlan = context.toPhysical(context.optimizerRewrite(this), cacheHint)
+ cp = Some(physPlan)
+ physPlan
+ case Some(cp) => cp
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
new file mode 100644
index 0000000..804a00e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpAB[K: ClassTag ](
+ override var A: DrmLike[K],
+ override var B: DrmLike[Int])
+ extends AbstractBinaryOp[K, Int, K] {
+
+ assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
new file mode 100644
index 0000000..f131f3f
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpABAnyKey[B:ClassTag, K: ClassTag ](
+ override var A: DrmLike[K],
+ override var B: DrmLike[B])
+ extends AbstractBinaryOp[K, B, K] {
+
+ assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
new file mode 100644
index 0000000..f6503ed
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical AB' */
+case class OpABt[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[Int])
+ extends AbstractBinaryOp[K,Int,K] {
+
+ assert(A.ncol == B.ncol, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(B.nrow)
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
new file mode 100644
index 0000000..d07172a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** DRM elementwise operator */
+case class OpAewB[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[K],
+ val op: Char
+ ) extends AbstractBinaryOp[K, K, K] {
+
+ assert(A.ncol == B.ncol, "arguments must have same number of columns")
+ assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
new file mode 100644
index 0000000..91e0dd4
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Operator denoting expressions like 5.0 - A or A * 5.6 */
+case class OpAewScalar[K: ClassTag](
+ override var A: DrmLike[K],
+ val scalar: Double,
+ val op: String
+ ) extends AbstractUnaryOp[K,K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
new file mode 100644
index 0000000..3239ad2
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm._
+
+/** Logical A' */
+case class OpAt(
+ override var A: DrmLike[Int])
+ extends AbstractUnaryOp[Int, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(A.nrow)
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
new file mode 100644
index 0000000..c7c6046
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** A'A */
+case class OpAtA[K: ClassTag](
+ override var A: DrmLike[K]
+ ) extends AbstractUnaryOp[K, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long = throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
new file mode 100644
index 0000000..4e1dd5c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical A' for any row key to support A'A optimizations */
+case class OpAtAnyKey[A: ClassTag](
+ override var A: DrmLike[A])
+ extends AbstractUnaryOp[A, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(A.nrow)
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
new file mode 100644
index 0000000..ef3ae6b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical A'B */
+case class OpAtB[A: ClassTag](
+ override var A: DrmLike[A],
+ override var B: DrmLike[A])
+ extends AbstractBinaryOp[A, A, Int] {
+
+ assert(A.nrow == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
new file mode 100644
index 0000000..36769c7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+
+/** Logical A'x. */
+case class OpAtx(
+ override var A: DrmLike[Int],
+ val x: Vector
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.nrow == x.length, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = safeToNonNegInt(A.ncol)
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = 1
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
new file mode 100644
index 0000000..a726989
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Ax. */
+case class OpAx[K: ClassTag](
+ override var A: DrmLike[K],
+ val x: Vector
+ ) extends AbstractUnaryOp[K, K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.ncol == x.length, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = 1
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
new file mode 100644
index 0000000..8e4362d
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+
+class OpMapBlock[S: ClassTag, R: ClassTag](
+ override var A: DrmLike[S],
+ val bmf: BlockMapFunc[S, R],
+ val _ncol: Int = -1,
+ val _nrow: Long = -1
+ ) extends AbstractUnaryOp[S, R] {
+
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
new file mode 100644
index 0000000..697bbd3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical row-range slicing */
+case class OpRowRange(
+ override var A: DrmLike[Int],
+ val rowRange: Range
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = rowRange.length
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
new file mode 100644
index 0000000..1ca79b3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Times-left over in-core matrix operand */
+case class OpTimesLeftMatrix(
+ val left: Matrix,
+ override var A: DrmLike[Int]
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ assert(left.ncol == A.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = left.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+ /** Non-zero element count */
+ // TODO
+ def nNonZero: Long = throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
new file mode 100644
index 0000000..c55f7f0
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical times-right over in-core matrix operand. */
+case class OpTimesRightMatrix[K: ClassTag](
+ override var A: DrmLike[K],
+ val right: Matrix
+ ) extends AbstractUnaryOp[K, K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.ncol == right.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = right.ncol
+
+ /** Non-zero element count */
+ // TODO
+ def nNonZero: Long = throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
new file mode 100644
index 0000000..768bb1c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+
+package object drm {
+
+ /** Drm row-wise tuple */
+ type DrmTuple[K] = (K, Vector)
+
+ /** Drm block-wise tuple: Array of row keys and the matrix block. */
+ type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
+
+
+ /** Block-map func */
+ type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
+
+ /** CacheHint type */
+ // type CacheHint = CacheHint.CacheHint
+
+ def safeToNonNegInt(x: Long): Int = {
+ assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
+ x.toInt
+ }
+
+ /** Broadcast support API */
+ def drmBroadcast(m:Matrix)(implicit ctx:DistributedContext):BCast[Matrix] = ctx.drmBroadcast(m)
+
+ /** Broadcast support API */
+ def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
+
+ /** Load DRM from hdfs (as in Mahout DRM format) */
+ def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path)
+
+ /** Shortcut to parallelizing matrices with indices, ignore row labels. */
+ def drmParallelize(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int] = drmParallelizeWithRowIndices(m, numPartitions)(sc)
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeWithRowIndices(m, numPartitions)
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit ctx: DistributedContext): CheckpointedDrm[String] = ctx.drmParallelizeWithRowLabels(m, numPartitions)
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeEmpty(nrow, ncol, numPartitions)
+
+ /** Creates empty DRM with non-trivial height */
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
+
+ /** Implicit broadcast -> value conversion. */
+ implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+
+ /** Just throw all engine operations into context as well. */
+ implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+
+ implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
+ new CheckpointedOps[K](drm)
+
+ implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+ // ============== Decompositions ===================
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+ DQR.dqrThin(A, checkRankDeficiency)
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+
+}