You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/05/27 21:14:51 UTC
[1/3] MAHOUT-1529 closes PR #1
Repository: mahout
Updated Branches:
refs/heads/master 5a1c2ccba -> 8714a0f72
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
deleted file mode 100644
index 1b1bcec..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.plan._
-import org.apache.mahout.math.{Matrices, SparseColumnMatrix, Vector, Matrix}
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesLeftMatrix
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
-import org.apache.mahout.sparkbindings.drm.plan.OpAB
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.hadoop.io.Writable
-import org.apache.spark.SparkContext._
-
-class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
-
- import RLikeDrmOps._
-
- def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
-
- def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
-
- def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
-
- def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
-
- def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
-
- def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
-
- def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
-
- def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
-
- def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
-
- def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
-
- def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
-
- def %*%[B:ClassTag](that:DrmLike[B]):DrmLike[K] = OpABAnyKey[B,K](A=this.drm, B=that)
-
- def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
-
- def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
-
- def %*%(that: Matrix): DrmLike[K] = this :%*% that
-
- def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
-
- def %*%(that: Vector): DrmLike[K] = :%*%(that)
-
- def t: DrmLike[Int] = OpAtAnyKey(A = drm)
-}
-
-class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
-
- override def t: DrmLike[Int] = OpAt(A = drm)
-
- def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
-
- def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
-
-
-}
-
-object RLikeDrmOps {
- implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
-
- implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
-
- implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
-
- implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
-
- implicit def cp2cpops[K:ClassTag](cp:CheckpointedDrm[K]):CheckpointedOps[K] = new CheckpointedOps(cp)
-
- /**
- * This is probably dangerous since it triggers implicit checkpointing with default storage level
- * setting.
- */
- implicit def drm2cpops[K:ClassTag](drm:DrmLike[K]):CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
new file mode 100644
index 0000000..ac36f60
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.BCast
+import org.apache.spark.broadcast.Broadcast
+
+class SparkBCast[T](val sbcast: Broadcast[T]) extends BCast[T] with Serializable {
+ def value: T = sbcast.value
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 6f774ba..322d67c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -27,188 +27,32 @@ import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import SparkContext._
import org.apache.spark.broadcast.Broadcast
-import org.apache.mahout.sparkbindings.drm.decompositions.{DSPCA, DSSVD, DQR}
+import org.apache.mahout.math.drm._
+import SparkContext._
package object drm {
private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings");
- /** Drm row-wise tuple */
- type DrmTuple[K] = (K, Vector)
-
- /** Row-wise organized DRM rdd type */
- type DrmRdd[K] = RDD[DrmTuple[K]]
-
- /** Drm block-wise tuple: Array of row keys and the matrix block. */
- type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
-
- /**
- * Blockifed DRM rdd (keys of original DRM are grouped into array corresponding to rows of Matrix
- * object value
- */
- type BlockifiedDrmRdd[K] = RDD[BlockifiedDrmTuple[K]]
-
- /** Block-map func */
- type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
-
- /** CacheHint type */
-// type CacheHint = CacheHint.CacheHint
+ private[sparkbindings] implicit def input2drmRdd[K](input: DrmRddInput[K]): DrmRdd[K] = input.toDrmRdd()
- implicit def input2drmRdd[K](input: DrmRddInput[K]): DrmRdd[K] = input.toDrmRdd()
+ private[sparkbindings] implicit def input2blockifiedDrmRdd[K](input: DrmRddInput[K]): BlockifiedDrmRdd[K] = input.toBlockifiedDrmRdd()
- implicit def input2blockifiedDrmRdd[K](input: DrmRddInput[K]): BlockifiedDrmRdd[K] = input.toBlockifiedDrmRdd()
-
- implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
+ private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
new DrmRddInput(rowWiseSrc = Some(cp.ncol -> cp.rdd))
- implicit def drm2drmOps[K <% Writable : ClassTag](drm: CheckpointedDrmBase[K]): CheckpointedOps[K] =
- new CheckpointedOps[K](drm)
-
- implicit def v2Writable(v: Vector): VectorWritable = new VectorWritable(v)
-
- implicit def m2Writable(m: Matrix): MatrixWritable = new MatrixWritable(m)
-
- implicit def vw2v(vw: VectorWritable): Vector = vw.get()
-
- implicit def mw2m(mw: MatrixWritable): Matrix = mw.get()
-
- implicit def drmLike2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
-
- implicit def bcast2Matrix(bcast: Broadcast[_ <: Matrix]): Matrix = bcast.value
+// /** Broadcast vector (Mahout vectors are not closure-friendly, use this instead. */
+// private[sparkbindings] def drmBroadcast(x: Vector)(implicit sc: SparkContext): Broadcast[Vector] = sc.broadcast(x)
+//
+// /** Broadcast in-core Mahout matrix. Use this instead of closure. */
+// private[sparkbindings] def drmBroadcast(m: Matrix)(implicit sc: SparkContext): Broadcast[Matrix] = sc.broadcast(m)
- implicit def bcast2Vector(bcast: Broadcast[_ <: Vector]): Vector = bcast.value
+ /** Implicit broadcast cast for Spark physical op implementations. */
+ private[sparkbindings] implicit def bcast2val[K](bcast:Broadcast[K]):K = bcast.value
-
- /**
- * Load DRM from hdfs (as in Mahout DRM format)
- *
- * @param path
- * @param sc spark context (wanted to make that implicit, doesn't work in current version of
- * scala with the type bounds, sorry)
- *
- * @return DRM[Any] where Any is automatically translated to value type
- */
- def drmFromHDFS (path: String)(implicit sc: SparkContext): CheckpointedDrmBase[_] = {
- val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
-
- val key = rdd.map(_._1).take(1)(0)
- val keyWClass = key.getClass.asSubclass(classOf[Writable])
-
- val key2val = key match {
- case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
- case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
- case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
- case xx: Writable => (v: AnyRef) => v
- }
-
- val val2key = key match {
- case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
- case xx: Text => (x: Any) => new Text(x.toString)
- case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
- case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
- }
-
- val km = key match {
- case xx: IntWritable => implicitly[ClassTag[Int]]
- case xx: Text => implicitly[ClassTag[String]]
- case xx: LongWritable => implicitly[ClassTag[Long]]
- case xx: Writable => ClassTag(classOf[Writable])
- }
-
- {
- implicit def getWritable(x: Any): Writable = val2key()
- new CheckpointedDrmBase(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
- }
- }
-
- /** Shortcut to parallelizing matrices with indices, ignore row labels. */
- def drmParallelize(m: Matrix, numPartitions: Int = 1)
- (implicit sc: SparkContext) =
- drmParallelizeWithRowIndices(m, numPartitions)(sc)
-
- /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
- def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
- (implicit sc: SparkContext)
- : CheckpointedDrm[Int] = {
-
- new CheckpointedDrmBase(parallelizeInCore(m, numPartitions))
- }
-
- private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
- (implicit sc: SparkContext): DrmRdd[Int] = {
-
- val p = (0 until m.nrow).map(i => i -> m(i, ::))
- sc.parallelize(p, numPartitions)
-
- }
-
- /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
- def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
- (implicit sc: SparkContext)
- : CheckpointedDrmBase[String] = {
-
-
- // In spark 0.8, I have patched ability to parallelize kryo objects directly, so no need to
- // wrap that into byte array anymore
- val rb = m.getRowLabelBindings
- val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
-
-
- new CheckpointedDrmBase(sc.parallelize(p, numPartitions))
- }
-
- /** This creates an empty DRM with specified number of partitions and cardinality. */
- def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
- (implicit sc: SparkContext): CheckpointedDrm[Int] = {
- val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
- val partNRow = (nrow - 1) / numPartitions + 1
- val partStart = partNRow * part
- val partEnd = Math.min(partStart + partNRow, nrow)
-
- for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
- })
- new CheckpointedDrmBase[Int](rdd, nrow, ncol)
- }
-
- def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
- (implicit sc: SparkContext): CheckpointedDrmBase[Long] = {
- val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
- val partNRow = (nrow - 1) / numPartitions + 1
- val partStart = partNRow * part
- val partEnd = Math.min(partStart + partNRow, nrow)
-
- for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
- })
- new CheckpointedDrmBase[Long](rdd, nrow, ncol)
- }
-
- def drmWrap[K : ClassTag](
- rdd: DrmRdd[K],
- nrow: Int = -1,
- ncol: Int = -1
- ): CheckpointedDrm[K] =
- new CheckpointedDrmBase[K](
- rdd = rdd,
- _nrow = nrow,
- _ncol = ncol
- )
-
-
- /** Broadcast vector (Mahout vectors are not closure-friendly, use this instead. */
- def drmBroadcast(x: Vector)(implicit sc: SparkContext): Broadcast[Vector] = sc.broadcast(x)
-
- /** Broadcast in-core Mahout matrix. Use this instead of closure. */
- def drmBroadcast(m: Matrix)(implicit sc: SparkContext): Broadcast[Matrix] = sc.broadcast(m)
-
- def safeToNonNegInt(x: Long): Int = {
- assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
- x.toInt
- }
-
- def blockify[K: ClassTag](rdd: DrmRdd[K], blockncol: Int): BlockifiedDrmRdd[K] = {
+ private[sparkbindings] def blockify[K: ClassTag](rdd: DrmRdd[K], blockncol: Int): BlockifiedDrmRdd[K] = {
rdd.mapPartitions(iter => {
@@ -226,7 +70,7 @@ package object drm {
})
}
- def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
+ private[sparkbindings] def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
// Just flat-map rows, connect with the keys
rdd.flatMap({
@@ -246,46 +90,4 @@ package object drm {
})
- // ============== Decompositions ===================
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
- DQR.dqrThin(A, checkRankDeficiency)
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
-
-
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
deleted file mode 100644
index 6498d87..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import scala.util.Random
-
-abstract class AbstractBinaryOp[A : ClassTag, B : ClassTag, K : ClassTag]
- extends CheckpointAction[K] with DrmLike[K] {
-
- protected[plan] var A: DrmLike[A]
- protected[plan] var B: DrmLike[B]
-
- // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
- def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
- def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
-
- def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
deleted file mode 100644
index a331345..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Abstract unary operator */
-abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
- extends CheckpointAction[K] with DrmLike[K] {
-
- protected[plan] var A: DrmLike[A]
-
- def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
- def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
deleted file mode 100644
index cd935bc..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.blas._
-import org.apache.mahout.sparkbindings.drm._
-import CheckpointAction._
-import org.apache.spark.SparkContext._
-import org.apache.hadoop.io.Writable
-import org.apache.spark.storage.StorageLevel
-import scala.util.Random
-
-/** Implementation of distributed expression checkpoint and optimizer. */
-abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
-
- private[sparkbindings] lazy val partitioningTag: Long = Random.nextLong()
-
- private var cp:Option[CheckpointedDrm[K]] = None
-
-
- def isIdenticallyPartitioned(other:DrmLike[_]) =
- partitioningTag!= 0L && partitioningTag == other.partitioningTag
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp.getOrElse({
- // Non-zero count is sparsely supported by logical operators now. So assume we have no knowledge
- // if it is unsupported, instead of failing.
- val plan = optimize(this)
- val rdd = exec(plan)
- val newcp = new CheckpointedDrmBase(
- rdd = rdd,
- _nrow = nrow,
- _ncol = ncol,
- _cacheStorageLevel = cacheHint2Spark(cacheHint),
- partitioningTag = plan.partitioningTag
- )
- cp = Some(newcp)
- newcp.cache()
- })
-
-}
-
-object CheckpointAction {
-
- /** Perform expression optimization. Return physical plan that we can pass to exec() */
- def optimize[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
-
- private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
- case CacheHint.NONE => StorageLevel.NONE
- case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
- case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
- case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
- case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
- case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
- case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
- case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
- case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
- case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
- case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
- }
-
-
- /** This is mostly multiplication operations rewrites */
- private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
-
- action match {
- case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
- case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
-
- // For now, rewrite left-multiply via transpositions, i.e.
- // inCoreA %*% B = (B' %*% inCoreA')'
- case op@OpTimesLeftMatrix(a, b) =>
- OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] => action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] =>
- uop.A = pass1(uop.A)(uop.classTagA)
- uop
- case bop: AbstractBinaryOp[_, _, K] =>
- bop.A = pass1(bop.A)(bop.classTagA)
- bop.B = pass1(bop.B)(bop.classTagB)
- bop
- }
- }
-
- /** This would remove stuff like A.t.t that previous step may have created */
- private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
- action match {
- // A.t.t => A
- case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
-
- // A.t.t => A
-// case OpAt(top@OpAtAnyKey(a)) => pass2(a)(top.classTagA)
-
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] => action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] =>
- uop.A = pass2(uop.A)(uop.classTagA)
- uop
- case bop: AbstractBinaryOp[_, _, K] =>
- bop.A = pass2(bop.A)(bop.classTagA)
- bop.B = pass2(bop.B)(bop.classTagB)
- bop
- }
- }
-
- /** Some further rewrites that are conditioned on A.t.t removal */
- private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
- action match {
-
- // matrix products.
- case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
-
- // AtB cases that make sense.
- case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
- case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
-
- // Need some cost to choose between the following.
-
- case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
- // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
- case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
- // Rewrite A'x
- case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] => action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] =>
- uop.A = pass3(uop.A)(uop.classTagA)
- uop
- case bop: AbstractBinaryOp[_, _, K] =>
- bop.A = pass3(bop.A)(bop.classTagA)
- bop.B = pass3(bop.B)(bop.classTagB)
- bop
- }
- }
-
-
- /** Execute previously optimized physical plan */
- def exec[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
- // I do explicit evidence propagation here since matching via case classes seems to be loosing
- // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
- // Hence you see explicit evidence attached to all recursive exec() calls.
- oper match {
- // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
- // the A'A case but actual transposition intent which should be removed from consideration
- // (we cannot do actual flip for non-int-keyed arguments)
- case OpAtAnyKey(_) =>
- throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
- case op@OpAt(a) => At.at(op, exec(a)(op.classTagA))
- case op@OpABt(a, b) => ABt.abt(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
- case op@OpAtB(a, b) => AtB.atb_nograph(op, exec(a)(op.classTagA), exec(b)(op.classTagB),
- zippable = a.partitioningTag == b.partitioningTag)
- case op@OpAtA(a) => AtA.at_a(op, exec(a)(op.classTagA))
- case op@OpAx(a, x) => Ax.ax_with_broadcast(op, exec(a)(op.classTagA))
- case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, exec(a)(op.classTagA))
- case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
- case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
- case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
- case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
- case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, exec(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, exec(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, exec(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, exec(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, exec(a)(op.classTagA), s)
- case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, exec(a)(op.classTagA), s)
- case op@OpRowRange(a, _) => Slicing.rowRange(op, exec(a)(op.classTagA))
- case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, exec(a)(op.classTagA))
- // Custom operators, we just execute them
- case blockOp: OpMapBlock[K, _] => blockOp.exec(src = exec(blockOp.A)(blockOp.classTagA))
- case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
- case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
- .format(oper))
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
deleted file mode 100644
index 4911eb2..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.hadoop.io.Writable
-import org.apache.spark.SparkContext._
-
-/** Logical AB */
-case class OpAB[K: ClassTag ](
- override var A: DrmLike[K],
- override var B: DrmLike[Int])
- extends AbstractBinaryOp[K, Int, K] {
-
- assert(A.ncol == B.nrow, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = B.ncol
-
- /** Non-zero element count */
- def nNonZero: Long =
- // TODO: for purposes of cost calculation, approximate based on operands
- throw new UnsupportedOperationException
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
deleted file mode 100644
index 30d3f8c..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical AB */
-case class OpABAnyKey[B:ClassTag, K: ClassTag ](
- override var A: DrmLike[K],
- override var B: DrmLike[B])
- extends AbstractBinaryOp[K, B, K] {
-
- assert(A.ncol == B.nrow, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = B.ncol
-
- /** Non-zero element count */
- def nNonZero: Long =
- // TODO: for purposes of cost calculation, approximate based on operands
- throw new UnsupportedOperationException
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
deleted file mode 100644
index c1bc21d..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
-
-/** Logical AB' */
-case class OpABt[K: ClassTag](
- override var A: DrmLike[K],
- override var B: DrmLike[Int])
- extends AbstractBinaryOp[K,Int,K] {
-
- assert(A.ncol == B.ncol, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = safeToNonNegInt(B.nrow)
-
- /** Non-zero element count */
- def nNonZero: Long =
- // TODO: for purposes of cost calculation, approximate based on operands
- throw new UnsupportedOperationException
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
deleted file mode 100644
index a20e1af..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** DRM elementwise operator */
-case class OpAewB[K: ClassTag](
- override var A: DrmLike[K],
- override var B: DrmLike[K],
- val op: Char
- ) extends AbstractBinaryOp[K, K, K] {
-
- assert(A.ncol == B.ncol, "arguments must have same number of columns")
- assert(A.nrow == B.nrow, "arguments must have same number of rows")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = A.ncol
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
deleted file mode 100644
index b35d737..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Operator denoting expressions like 5.0 - A or A * 5.6 */
-case class OpAewScalar[K: ClassTag](
- override var A: DrmLike[K],
- val scalar: Double,
- val op: String
- ) extends AbstractUnaryOp[K,K] {
-
- override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = A.ncol
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
deleted file mode 100644
index 1c212e1..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A' */
-case class OpAt(
- override var A: DrmLike[Int])
- extends AbstractUnaryOp[Int, Int] {
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.ncol
-
- /** R-like syntax for number of columns */
- def ncol: Int = safeToNonNegInt(A.nrow)
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
deleted file mode 100644
index 2e3aa4c..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** A'A */
-case class OpAtA[K: ClassTag](
- override var A: DrmLike[K]
- ) extends AbstractUnaryOp[K, Int] {
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.ncol
-
- /** R-like syntax for number of columns */
- def ncol: Int = A.ncol
-
- /** Non-zero element count */
- def nNonZero: Long = throw new UnsupportedOperationException
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
deleted file mode 100644
index 98e6218..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A' for any row key to support A'A optimizations */
-case class OpAtAnyKey[A:ClassTag](
- override var A: DrmLike[A])
- extends AbstractUnaryOp[A, Int] {
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.ncol
-
- /** R-like syntax for number of columns */
- def ncol: Int = safeToNonNegInt(A.nrow)
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
deleted file mode 100644
index 6e830f8..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A'B */
-case class OpAtB[A:ClassTag](
- override var A: DrmLike[A],
- override var B: DrmLike[A])
- extends AbstractBinaryOp[A,A,Int] {
-
- assert(A.nrow == B.nrow, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.ncol
-
- /** R-like syntax for number of columns */
- def ncol: Int = B.ncol
-
- /** Non-zero element count */
- def nNonZero: Long =
- // TODO: for purposes of cost calculation, approximate based on operands
- throw new UnsupportedOperationException
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
deleted file mode 100644
index edc00f0..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-
-/** Logical A'x. */
-case class OpAtx(
- override var A: DrmLike[Int],
- val x: Vector
- ) extends AbstractUnaryOp[Int, Int] {
-
- override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
- assert(A.nrow == x.length, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = safeToNonNegInt(A.ncol)
-
- /** R-like syntax for number of columns */
- def ncol: Int = 1
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
deleted file mode 100644
index 7dce249..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Vector, Matrix}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical Ax. */
-case class OpAx[K: ClassTag](
- override var A: DrmLike[K],
- val x: Vector
- ) extends AbstractUnaryOp[K, K] {
-
- override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
- assert(A.ncol == x.length, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = 1
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
deleted file mode 100644
index a0c882d..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.{DrmRddInput, BlockMapFunc, DrmLike}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-class OpMapBlock[S: ClassTag, R: ClassTag](
- override var A: DrmLike[S],
- val bmf: BlockMapFunc[S, R],
- val _ncol: Int = -1,
- val _nrow: Long = -1
- ) extends AbstractUnaryOp[S, R] {
-
-
- override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
- /** R-like syntax for number of rows. */
- def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
-
- def exec(src: DrmRddInput[S]): DrmRddInput[R] = {
-
- // We can't use attributes to avoid putting the whole this into closure.
- val bmf = this.bmf
- val ncol = this.ncol
-
- val rdd = src.toBlockifiedDrmRdd()
- .map(blockTuple => {
- val out = bmf(blockTuple)
-
- assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
- assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
-
- out
- })
- new DrmRddInput(blockifiedSrc = Some(rdd))
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
deleted file mode 100644
index 316661b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** Logical row-range slicing */
-case class OpRowRange(
- override var A:DrmLike[Int],
- val rowRange:Range
- ) extends AbstractUnaryOp[Int, Int] {
-
- assert(rowRange.head>=0 && rowRange.last< A.nrow, "row range out of range")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = rowRange.length
-
- /** R-like syntax for number of columns */
- def ncol: Int = A.ncol
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
deleted file mode 100644
index 94a4d0b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** Logical Times-left over in-core matrix operand */
-case class OpTimesLeftMatrix(
- val left: Matrix,
- override var A: DrmLike[Int]
- ) extends AbstractUnaryOp[Int, Int] {
-
- assert(left.ncol == A.nrow, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = left.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = A.ncol
-
- /** Non-zero element count */
- // TODO
- def nNonZero: Long = throw new UnsupportedOperationException
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
deleted file mode 100644
index 007672b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical times-right over in-core matrix operand. */
-case class OpTimesRightMatrix[K: ClassTag](
- override var A: DrmLike[K],
- val right: Matrix
- ) extends AbstractUnaryOp[K, K] {
-
- override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
- assert(A.ncol == right.nrow, "Incompatible operand geometry")
-
- /** R-like syntax for number of rows. */
- def nrow: Long = A.nrow
-
- /** R-like syntax for number of columns */
- def ncol: Int = right.ncol
-
- /** Non-zero element count */
- // TODO
- def nNonZero: Long = throw new UnsupportedOperationException
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
deleted file mode 100644
index 6fb03f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-
-/** This package contains logical distributed expression operators. */
-package object plan {
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 14fa200..b0042c9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -20,10 +20,12 @@ package org.apache.mahout.sparkbindings.io
import com.esotericsoftware.kryo.Kryo
import org.apache.mahout.math._
import org.apache.spark.serializer.KryoRegistrator
-import org.apache.mahout.sparkbindings.drm._
-
+import org.apache.mahout.sparkbindings._
+/**
+ *
+ */
class MahoutKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 3010442..34d81cf 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -17,20 +17,33 @@
package org.apache.mahout
-import org.apache.spark.rdd.RDD
-import org.apache.mahout.math.{Matrix, Vector}
-import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, SparkContext}
import java.io._
import scala.collection.mutable.ArrayBuffer
import org.apache.mahout.common.IOUtils
import org.apache.log4j.Logger
-import org.apache.mahout.sparkbindings.drm.DrmLike
+import org.apache.mahout.math.drm._
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.{SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.broadcast.Broadcast
+import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
+import org.apache.hadoop.io.Writable
+/** Public api for Spark-specific operators */
package object sparkbindings {
private[sparkbindings] val log = Logger.getLogger("org.apache.mahout.sparkbindings")
+ /** Row-wise organized DRM rdd type */
+ type DrmRdd[K] = RDD[DrmTuple[K]]
+
+ /**
+ * Blockifed DRM rdd (keys of original DRM are grouped into array corresponding to rows of Matrix
+ * object value
+ */
+ type BlockifiedDrmRdd[K] = RDD[BlockifiedDrmTuple[K]]
+
/**
* Create proper spark context that includes local Mahout jars
* @param masterUrl
@@ -42,7 +55,7 @@ package object sparkbindings {
customJars: TraversableOnce[String] = Nil,
sparkConf: SparkConf = new SparkConf(),
addMahoutJars: Boolean = true
- ): SparkContext = {
+ ): SparkDistributedContext = {
val closeables = new java.util.ArrayDeque[Closeable]()
try {
@@ -125,13 +138,49 @@ package object sparkbindings {
sparkConf.setSparkHome(System.getenv("SPARK_HOME"))
}
- new SparkContext(config = sparkConf)
+ new SparkDistributedContext(new SparkContext(config = sparkConf))
} finally {
IOUtils.close(closeables)
}
+ }
+
+ implicit def sdc2sc(sdc: SparkDistributedContext): SparkContext = sdc.sc
+ implicit def sc2sdc(sc: SparkContext): SparkDistributedContext = new SparkDistributedContext(sc)
+
+ implicit def dc2sc(dc:DistributedContext):SparkContext = {
+ assert (dc.isInstanceOf[SparkDistributedContext],"distributed context must be Spark-specific.")
+ sdc2sc(dc.asInstanceOf[SparkDistributedContext])
}
+ /** Broadcast transforms */
+ implicit def sb2bc[T](b:Broadcast[T]):BCast[T] = new SparkBCast(b)
+
+ /** Adding Spark-specific ops */
+ implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] =
+ new CheckpointedDrmSparkOps[K](drm)
+
+ implicit def drm2cpDrmSparkOps[K:ClassTag](drm:DrmLike[K]):CheckpointedDrmSparkOps[K] = drm:CheckpointedDrm[K]
+
+ private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new MatrixWritable(m)
+
+ private[sparkbindings] implicit def w2m(w: MatrixWritable): Matrix = w.get()
+
+ private[sparkbindings] implicit def v2w(v: Vector): VectorWritable = new VectorWritable(v)
+
+ private[sparkbindings] implicit def w2v(w:VectorWritable):Vector = w.get()
+
+ def drmWrap[K : ClassTag](
+ rdd: DrmRdd[K],
+ nrow: Int = -1,
+ ncol: Int = -1
+ ): CheckpointedDrm[K] =
+ new CheckpointedDrmSpark[K](
+ rdd = rdd,
+ _nrow = nrow,
+ _ncol = ncol
+ )
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 3e4222a..0834145 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -20,11 +20,12 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
import org.scalatest.FunSuite
import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm._
import org.apache.mahout.sparkbindings._
-import drm._
+import org.apache.mahout.sparkbindings.drm._
import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
/** Tests for AB' operator algorithms */
class ABtSuite extends FunSuite with MahoutLocalContext {
@@ -37,7 +38,7 @@ class ABtSuite extends FunSuite with MahoutLocalContext {
val op = new OpABt(A, B)
- val drm = new CheckpointedDrmBase(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreMControl = inCoreA %*% inCoreB.t
val inCoreM = drm.collect
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 3065737..2efa5ff 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -21,9 +21,11 @@ import org.scalatest.FunSuite
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAewB
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpAewB
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
/** Elementwise matrix operation tests */
class AewBSuite extends FunSuite with MahoutLocalContext {
@@ -36,7 +38,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val op = new OpAewB(A, B, '*')
- val M = new CheckpointedDrmBase(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA * inCoreB
@@ -53,7 +55,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val op = new OpAewB(A, B, '+')
- val M = new CheckpointedDrmBase(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA + inCoreB
@@ -70,7 +72,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val op = new OpAewB(A, B, '-')
- val M = new CheckpointedDrmBase(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA - inCoreB
@@ -87,7 +89,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
val op = new OpAewB(A, B, '/')
- val M = new CheckpointedDrmBase(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+ val M = new CheckpointedDrmSpark(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
val inCoreM = M.collect
val inCoreMControl = inCoreA / inCoreB
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
index 34bb88a..8734b70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
@@ -21,10 +21,10 @@ import org.scalatest.FunSuite
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
import org.apache.mahout.math.scalabindings._
import RLikeOps._
+import org.apache.mahout.math.drm._
import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpAtA
/** Tests for {@link XtX} */
class AtASuite extends FunSuite with MahoutLocalContext {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
index 48edbe9..a53501d 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
@@ -21,20 +21,19 @@ import org.scalatest.FunSuite
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
-import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.OpAt
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
/** Tests for A' algorithms */
class AtSuite extends FunSuite with MahoutLocalContext {
test("At") {
val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
- val A = drmParallelize(m=inCoreA, numPartitions = 2)
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
val op = new OpAt(A)
- val AtDrm = new CheckpointedDrmBase(rdd= At.at(op,srcA=A),_nrow=op.nrow,_ncol=op.ncol)
+ val AtDrm = new CheckpointedDrmSpark(rdd = At.at(op, srcA = A), _nrow = op.nrow, _ncol = op.ncol)
val inCoreAt = AtDrm.collect
val inCoreControlAt = inCoreA.t
@@ -42,6 +41,5 @@ class AtSuite extends FunSuite with MahoutLocalContext {
assert((inCoreAt - inCoreControlAt).norm < 1E-5)
-
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
index e4f2d55..2369441 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.drm.decompositions
import org.scalatest.{Matchers, FunSuite}
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
import RLikeDrmOps._
-import org.apache.mahout.math.{Matrices, SparseRowMatrix}
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.common.RandomUtils
class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
index b4b6d98..6c71e11 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
@@ -18,10 +18,12 @@ package org.apache.mahout.sparkbindings.drm
*/
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
import RLikeOps._
import RLikeDrmOps._
-import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.FunSuite
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
/** Tests for DrmLikeOps */
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index 5feb731..caccb70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -18,11 +18,11 @@
package org.apache.mahout.sparkbindings.drm
import org.scalatest.FunSuite
-import org.apache.log4j.{Level, Logger, BasicConfigurator}
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
import RLikeOps._
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.spark.SparkContext._
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index da38174..ac46a9e 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -18,16 +18,18 @@
package org.apache.mahout.sparkbindings.drm
import org.scalatest.{Matchers, FunSuite}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
+import RLikeOps._
import RLikeDrmOps._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAtB, OpAtA, CheckpointAction}
-import org.apache.spark.SparkContext
+import org.apache.mahout.sparkbindings._
+import test.MahoutLocalContext
import scala.collection.mutable.ArrayBuffer
import org.apache.mahout.math.Matrices
-import org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.sparkbindings.{SparkEngine, blas}
import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
/** R-like DRM DSL operation tests */
class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -150,7 +152,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val C = A.t %*% B
- CheckpointAction.optimize(C) should equal (OpAtB[Int](A,B))
+ SparkEngine.optimizerRewrite(C) should equal (OpAtB[Int](A,B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% inCoreB
@@ -176,7 +178,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val C = A.t %*% B
- CheckpointAction.optimize(C) should equal (OpAtB[String](A,B))
+ SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% inCoreB
@@ -198,7 +200,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val C = A.t %*% B
- CheckpointAction.optimize(C) should equal (OpAtB[String](A,B))
+ SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
@@ -246,7 +248,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val AtA = A.t %*% A
// Assert optimizer detects square
- CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+ SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
val inCoreAtA = AtA.collect
val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -264,7 +266,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val AtA = A.t %*% A
// Assert optimizer detects square
- CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+ SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
val inCoreAtA = AtA.collect
val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -284,7 +286,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val AtA = A.t %*% A
// Assert optimizer detects square
- CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+ SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
val inCoreAtA = AtA.collect
val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -371,7 +373,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
}
test ("general side") {
- val sc = implicitly[SparkContext]
+ val sc = implicitly[DistributedContext]
val k1 = sc.parallelize(Seq(ArrayBuffer(0,1,2,3)))
// .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect!
.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -405,7 +407,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val drmA = drmParallelize(inCoreA, numPartitions = 2)
- CheckpointAction.optimize(drmA.t %*% x) should equal (OpAtx(drmA, x))
+ SparkEngine.optimizerRewrite(drmA.t %*% x) should equal (OpAtx(drmA, x))
val atx = (drmA.t %*% x).collect(::, 0)
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index ff7098b..2d9ed76 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -1,14 +1,15 @@
package org.apache.mahout.sparkbindings.test
import org.scalatest.Suite
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkConf
import org.apache.mahout.sparkbindings._
import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.drm.DistributedContext
trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
this: Suite =>
- protected implicit var mahoutCtx: SparkContext = _
+ protected implicit var mahoutCtx: DistributedContext = _
override protected def beforeEach() {
super.beforeEach()
@@ -26,7 +27,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
override protected def afterEach() {
if (mahoutCtx != null) {
try {
- mahoutCtx.stop()
+ mahoutCtx.close()
} finally {
mahoutCtx = null
}
[2/3] MAHOUT-1529 closes PR #1
Posted by dl...@apache.org.
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
deleted file mode 100644
index 8e05a83..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.scalabindings
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import RLikeOps._
-import org.apache.mahout.common.RandomUtils
-import scala._
-import org.apache.log4j.Logger
-
-private[math] object SSVD {
-
- private val log = Logger.getLogger(SSVD.getClass)
-
- /**
- * In-core SSVD algorithm.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- var y = a %*% omega
- var yty = y.t %*% y
- val at = a.t
- var ch = chol(yty)
- assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
- var bt = ch.solveRight(at %*% y)
-
- // Power iterations
- for (i <- 0 until q) {
- y = a %*% bt
- yty = y.t %*% y
- ch = chol(yty)
- bt = ch.solveRight(at %*% y)
- }
-
- val bbt = bt.t %*% bt
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = ch.solveRight(y) %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
- }
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- // Dataset mean
- val xi = a.colMeans()
-
- if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
- var y = a %*% omega
-
- // Fixing y
- val s_o = omega.t %*% xi
- y := ((r,c,v) => v - s_o(c))
-
- var yty = y.t %*% y
- var ch = chol(yty)
-// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
- // This is implicit Q of QR(Y)
- var qm = ch.solveRight(y)
- var bt = a.t %*% qm
- var s_q = qm.colSums()
- var s_b = bt.t %*% xi
-
- // Power iterations
- for (i <- 0 until q) {
-
- // Fix bt
- bt -= xi cross s_q
-
- y = a %*% bt
-
- // Fix Y again.
- y := ((r,c,v) => v - s_b(c))
-
- yty = y.t %*% y
- ch = chol(yty)
- qm = ch.solveRight(y)
- bt = a.t %*% qm
- s_q = qm.colSums()
- s_b = bt.t %*% xi
- }
-
- val c = s_q cross s_b
-
- // BB' computation becomes
- val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
-
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = qm %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+ private val log = Logger.getLogger(SSVD.getClass)
+
+ /**
+ * In-core SSVD algorithm.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ var y = a %*% omega
+ var yty = y.t %*% y
+ val at = a.t
+ var ch = chol(yty)
+ assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+ var bt = ch.solveRight(at %*% y)
+
+ // Power iterations
+ for (i <- 0 until q) {
+ y = a %*% bt
+ yty = y.t %*% y
+ ch = chol(yty)
+ bt = ch.solveRight(at %*% y)
+ }
+
+ val bbt = bt.t %*% bt
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = ch.solveRight(y) %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+ }
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ // Dataset mean
+ val xi = a.colMeans()
+
+ if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+ var y = a %*% omega
+
+ // Fixing y
+ val s_o = omega.t %*% xi
+ y := ((r,c,v) => v - s_o(c))
+
+ var yty = y.t %*% y
+ var ch = chol(yty)
+// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+ // This is implicit Q of QR(Y)
+ var qm = ch.solveRight(y)
+ var bt = a.t %*% qm
+ var s_q = qm.colSums()
+ var s_b = bt.t %*% xi
+
+ // Power iterations
+ for (i <- 0 until q) {
+
+ // Fix bt
+ bt -= xi cross s_q
+
+ y = a %*% bt
+
+ // Fix Y again.
+ y := ((r,c,v) => v - s_b(c))
+
+ yty = y.t %*% y
+ ch = chol(yty)
+ qm = ch.solveRight(y)
+ bt = a.t %*% qm
+ s_q = qm.colSums()
+ s_b = bt.t %*% xi
+ }
+
+ val c = s_q cross s_b
+
+ // BB' computation becomes
+ val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
+
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = qm %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index c9e59ba..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.math
import org.apache.mahout.math._
import org.apache.mahout.math.solver.EigenDecomposition
+import org.apache.mahout.math.decompositions.SSVD
/**
* Mahout matrices and vectors' scala syntactic sugar
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index a97b453..020a2f9 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.sparkbindings.shell
import org.apache.spark.repl.SparkILoop
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.mahout.sparkbindings._
import scala.tools.nsc.Properties
import scala.Some
+import org.apache.mahout.sparkbindings._
class MahoutSparkILoop extends SparkILoop {
private val postInitScript =
- "import org.apache.mahout.math.Vector" ::
- "import org.apache.mahout.math.scalabindings._" ::
- "import RLikeOps._" ::
- "import org.apache.mahout.sparkbindings._" ::
- "import drm._" ::
- "import RLikeDrmOps._" ::
- "org.apache.spark.storage.StorageLevel" ::
- "implicit val _sc = sc" ::
- Nil
+ "import org.apache.mahout.math._" ::
+ "import scalabindings._" ::
+ "import RLikeOps._" ::
+ "import drm._" ::
+ "import RLikeDrmOps._" ::
+ "import org.apache.mahout.sparkbindings._" ::
+ Nil
override protected def postInitialization() {
super.postInitialization()
@@ -50,10 +48,25 @@ class MahoutSparkILoop extends SparkILoop {
customJars = jars,
sparkConf = conf
)
+
echo("Created spark context..")
sparkContext
}
+ override def initializeSpark() {
+ intp.beQuietDuring {
+ command("""
+
+ @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext =
+ new org.apache.mahout.sparkbindings.SparkDistributedContext(
+ org.apache.spark.repl.Main.interp.createSparkContext())
+
+ """)
+ command("import org.apache.spark.SparkContext._")
+ echo("Mahout distributed context is available as \"implicit val sdc\".")
+ }
+ }
+
override def prompt: String = "mahout> "
override def printWelcome(): Unit = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
index 648f07f..9c0a51f 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.sparkbindings.shell
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/test/mahout/simple.mscala
----------------------------------------------------------------------
diff --git a/spark-shell/src/test/mahout/simple.mscala b/spark-shell/src/test/mahout/simple.mscala
index 385e4e8..854c482 100644
--- a/spark-shell/src/test/mahout/simple.mscala
+++ b/spark-shell/src/test/mahout/simple.mscala
@@ -1,8 +1,25 @@
-import org.apache.mahout.sparkbindings._
-import drm._
-import RLikeDrmOps._
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import org.apache.spark.storage.StorageLevel
+/*
+ To run, execute from mahout shell:
+
+ :load spark-shell/src/test/mahout/simple.mscala
+*/
val a = dense((1,2,3),(3,4,5))
val drmA = drmParallelize(a,numPartitions = 2)
@@ -19,5 +36,5 @@ r.collect
// local write
r.writeDRM("file:///home/dmitriy/A")
-// hdfs write
-r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
+// hdfs write -- uncomment to test
+// r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 8b89969..ac99ffd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -220,53 +220,52 @@
</property>
</activation>
<dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop2.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-mrlegacy</artifactId>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop2.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mahout</groupId>
+ <artifactId>mahout-mrlegacy</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ </dependencies>
</profile>
</profiles>
<dependencies>
- <!-- spark stuff -->
+
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.major}</artifactId>
- <version>${spark.version}</version>
+ <groupId>org.apache.mahout</groupId>
+ <artifactId>mahout-math-scala</artifactId>
</dependency>
<dependency>
@@ -283,18 +282,18 @@
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math-scala</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-math-scala</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!-- 3rd-party -->
-
+ <!-- spark stuff -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.major}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
<!-- scala stuff -->
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
new file mode 100644
index 0000000..4d13a5a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext}
+import org.apache.spark.SparkContext
+
+class SparkDistributedContext(val sc: SparkContext) extends DistributedContext {
+
+ val engine: DistributedEngine = SparkEngine
+
+ def close() {
+ sc.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
new file mode 100644
index 0000000..0c904ab
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.math._
+import scala.reflect.ClassTag
+import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings.blas._
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import scala.Some
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext
+
+/** Spark-specific non-drm-method operations */
+object SparkEngine extends DistributedEngine {
+
+ def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ val n = drm.ncol
+
+ drm.rdd
+ // Throw away keys
+ .map(_._2)
+ // Fold() doesn't work with kryo still. So work around it.
+ .mapPartitions(iter => {
+ val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+ Iterator(acc)
+ })
+ // Since we preallocated new accumulator vector per partition, this must not cause any side
+ // effects now.
+ .reduce(_ += _)
+ }
+
+ /** Engine-specific colMeans implementation based on a checkpoint. */
+ def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+ /**
+ * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+ *
+ * A particular physical engine implementation may choose to either use or not use these rewrites
+ * as a useful basic rewriting rule.<P>
+ */
+ override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
+
+ /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+ def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+ // Spark-specific Physical Plan translation.
+ val rdd = tr2phys(plan)
+
+ val newcp = new CheckpointedDrmSpark(
+ rdd = rdd,
+ _nrow = plan.nrow,
+ _ncol = plan.ncol,
+ _cacheStorageLevel = cacheHint2Spark(ch),
+ partitioningTag = plan.partitioningTag
+ )
+ newcp.cache()
+ }
+
+ /** Broadcast support */
+ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = dc.broadcast(v)
+
+ /** Broadcast support */
+ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
+
+ /**
+ * Load DRM from hdfs (as in Mahout DRM format)
+ *
+ * @param path
+ * @param sc spark context (wanted to make that implicit, doesn't work in current version of
+ * scala with the type bounds, sorry)
+ *
+ * @return DRM[Any] where Any is automatically translated to value type
+ */
+ def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+ implicit val scc:SparkContext = sc
+ val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
+
+ val key = rdd.map(_._1).take(1)(0)
+ val keyWClass = key.getClass.asSubclass(classOf[Writable])
+
+ val key2val = key match {
+ case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
+ case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
+ case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
+ case xx: Writable => (v: AnyRef) => v
+ }
+
+ val val2key = key match {
+ case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
+ case xx: Text => (x: Any) => new Text(x.toString)
+ case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
+ case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
+ }
+
+ val km = key match {
+ case xx: IntWritable => implicitly[ClassTag[Int]]
+ case xx: Text => implicitly[ClassTag[String]]
+ case xx: LongWritable => implicitly[ClassTag[Long]]
+ case xx: Writable => ClassTag(classOf[Writable])
+ }
+
+ {
+ implicit def getWritable(x: Any): Writable = val2key()
+ new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+ }
+ }
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext)
+ : CheckpointedDrm[Int] = {
+ new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+ }
+
+ private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): DrmRdd[Int] = {
+
+ val p = (0 until m.nrow).map(i => i -> m(i, ::))
+ sc.parallelize(p, numPartitions)
+
+ }
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext)
+ : CheckpointedDrm[String] = {
+
+ val rb = m.getRowLabelBindings
+ val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+
+ new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+ }
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ })
+ new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
+ }
+
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ })
+ new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
+ }
+
+ private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
+ case CacheHint.NONE => StorageLevel.NONE
+ case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
+ case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
+ case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
+ case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
+ case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
+ case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
+ case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
+ case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
+ case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
+ case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+ }
+
+ /** Translate previously optimized physical plan */
+ private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+ // I do explicit evidence propagation here since matching via case classes seems to be loosing
+ // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
+ // Hence you see explicit evidence attached to all recursive exec() calls.
+ oper match {
+ // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
+ // the A'A case but actual transposition intent which should be removed from consideration
+ // (we cannot do actual flip for non-int-keyed arguments)
+ case OpAtAnyKey(_) =>
+ throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+ case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
+ case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
+ zippable = a.partitioningTag == b.partitioningTag)
+ case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
+ case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+ case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+ // Custom operators, we just execute them
+ case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+ src = tr2phys(blockOp.A)(blockOp.classTagA),
+ ncol = blockOp.ncol,
+ bmf = blockOp.bmf
+ )
+ case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
+ case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+ .format(oper))
+
+ }
+ }
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 8d19068..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.sparkbindings._
+import drm._
import org.apache.mahout.math.{Matrix, SparseRowMatrix}
import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
/** Contains RDD plans for ABt operator */
object ABt {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index ec93cf7..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,13 +17,13 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.{OpAewScalar, OpAewB}
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
/** Elementwise drm-drm operators */
object AewB {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 0383fe1..c923e62 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -1,13 +1,14 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
import RLikeOps._
-
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
import org.apache.mahout.math.DiagonalMatrix
+import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
/** Matrix product with one of operands an in-core matrix */
object AinCoreB {
@@ -21,8 +22,8 @@ object AinCoreB {
private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
- val dg = drmBroadcast(x = op.right.viewDiagonal())
+ implicit val ctx:DistributedContext = rddA.context
+ val dg = drmBroadcast(op.right.viewDiagonal())
val rdd = rddA
// Just multiply the blocks
@@ -35,7 +36,7 @@ object AinCoreB {
private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val sc:DistributedContext = rddA.sparkContext
val bcastB = drmBroadcast(m = op.right)
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 38af173..56de9f4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,12 +17,12 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.SparkContext._
import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
+import org.apache.mahout.math.drm.logical.OpAt
/** A' algorithms */
object At {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index 17cab62..450e836 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -18,14 +18,16 @@
package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import collection._
import JavaConversions._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtA
+import SparkEngine._
/**
* Collection of algorithms to compute X' times X
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index f0c3423..86aadc8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -18,13 +18,14 @@
package org.apache.mahout.sparkbindings.blas
import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
import org.apache.mahout.sparkbindings.drm._
import org.apache.spark.rdd.RDD
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.SparkContext._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtB}
import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtB
object AtB {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index e6de443..94c3f06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -1,12 +1,13 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
import RLikeOps._
-
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAx, OpTimesRightMatrix}
+import drm._
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
/** Matrix product with one of operands an in-core matrix */
@@ -15,9 +16,9 @@ object Ax {
def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val sc:DistributedContext = rddA.sparkContext
- val bcastX = drmBroadcast(x = op.x)
+ val bcastX = drmBroadcast(op.x)
val rdd = rddA
// Just multiply the blocks
@@ -30,9 +31,9 @@ object Ax {
def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val dc:DistributedContext = rddA.sparkContext
- val bcastX = drmBroadcast(x = op.x)
+ val bcastX = drmBroadcast(op.x)
val inCoreM = rddA
// Just multiply the blocks
@@ -51,7 +52,7 @@ object Ax {
// It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
// it back as drm blockified rdd
- val rdd = sc.parallelize(Seq(inCoreM), numSlices = 1)
+ val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
.map(block => Array.tabulate(block.nrow)(i => i) -> block)
new DrmRddInput(blockifiedSrc = Some(rdd))
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 6bb7b4b..a3caac7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -17,11 +17,11 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.DrmRdd
import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
+import org.apache.mahout.sparkbindings.DrmRdd
class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
new file mode 100644
index 0000000..4c68c9a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import scala.reflect.ClassTag
+
+object MapBlock {
+
+ def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+
+ // We can't use attributes to avoid putting the whole this into closure.
+
+ val rdd = src.toBlockifiedDrmRdd()
+ .map(blockTuple => {
+ val out = bmf(blockTuple)
+
+ assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
+ assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
+
+ out
+ })
+ new DrmRddInput(blockifiedSrc = Some(rdd))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 5affd3b..d0a50b5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -1,7 +1,7 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.OpRowRange
import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRowRange
object Slicing {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 795f2e2..d2d5340 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,7 +17,6 @@
package org.apache.mahout.sparkbindings
-import org.apache.mahout.sparkbindings.drm.DrmRdd
import scala.reflect.ClassTag
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
deleted file mode 100644
index 89d3735..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
- private val log = Logger.getLogger(DQR.getClass)
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
- if (A.ncol > 5000)
- log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
- val AtA = (A.t %*% A).checkpoint()
- val inCoreAtA = AtA.collect
- implicit val sc = AtA.rdd.sparkContext
-
- if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
- val ch = chol(inCoreAtA)
- val inCoreR = (ch.getL cloned) t
-
- if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
- if (checkRankDeficiency && !ch.isPositiveDefinite)
- throw new IllegalArgumentException("R is rank-deficient.")
-
- val bcastAtA = sc.broadcast(inCoreAtA)
-
- // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
- // decompose A'A in the backend again.
-
- // Compute Q = A*inv(L') -- we can do it blockwise.
- val Q = A.mapBlock() {
- case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
- }
-
- Q -> inCoreR
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
deleted file mode 100644
index f3b0e3f..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
- implicit val sc = drmA.rdd.sparkContext
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // Dataset mean
- val xi = drmA.colMeans
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
- val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
- // This done in front in a single-threaded fashion for now. Even though it doesn't require any
- // memory beyond that is required to keep xi around, it still might be parallelized to backs
- // for significantly big n and r. TODO
- val s_o = omega.t %*% xi
-
- val bcastS_o = drmBroadcast(s_o)
- val bcastXi = drmBroadcast(xi)
-
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val s_o:Vector = bcastS_o
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
- keys -> blockY
- }
- // Checkpoint Y
- .checkpoint()
-
- var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- var s_q = drmQ.colSums()
- var bcastVarS_q = drmBroadcast(s_q)
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = (drmA.t %*% drmQ).checkpoint()
-
- var s_b = (drmBt.t %*% xi).collect(::, 0)
- var bcastVarS_b = drmBroadcast(s_b)
-
- for (i <- 0 until q) {
-
- // These closures don't seem to live well with outside-scope vars. This doesn't record closure
- // attributes correctly. So we create additional set of vals for broadcast vars to properly
- // create readonly closure attributes in this very scope.
- val bcastS_q = bcastVarS_q
- val bcastS_b = bcastVarS_b
- val bcastXib = bcastXi
-
- // Fix Bt as B' -= xi cross s_q
- drmBt = drmBt.mapBlock() {
- case (keys, block) =>
- val s_q: Vector = bcastS_q
- val xi: Vector = bcastXib
- keys.zipWithIndex.foreach {
- case (key, idx) => block(idx, ::) -= s_q * xi(key)
- }
- keys -> block
- }
-
- drmY.uncache()
- drmQ.uncache()
-
- drmY = (drmA %*% drmBt)
- // Fix Y by subtracting s_b from each row of the AB'
- .mapBlock() {
- case (keys, block) =>
- val s_b: Vector = bcastS_b
- for (row <- 0 until block.nrow) block(row, ::) -= s_b
- keys -> block
- }
- // Checkpoint Y
- .checkpoint()
-
- drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- s_q = drmQ.colSums()
- bcastVarS_q = drmBroadcast(s_q)
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = (drmA.t %*% drmQ).checkpoint()
-
- s_b = (drmBt.t %*% xi).collect(::, 0)
- bcastVarS_b = drmBroadcast(s_b)
- }
-
- val c = s_q cross s_b
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
- c - c.t + (s_q cross s_q) * (xi dot xi)
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
deleted file mode 100644
index de15d2b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
-
- // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
- // instantiate the Omega random matrix view in the backend instead. That way serialized closure
- // is much more compact.
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- keys -> blockY
- }
-
- var drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (q == 0) drmQ = drmQ.checkpoint()
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (q == 0) drmBt = drmBt.checkpoint()
-
- for (i <- 0 until q) {
- drmY = drmA %*% drmBt
- drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (i == q - 1) drmQ = drmQ.checkpoint()
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (i == q - 1) drmBt = drmBt.checkpoint()
- }
-
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
deleted file mode 100644
index faa89ef..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.mahout.sparkbindings.drm
-
-
-object CacheHint extends Enumeration {
-
- type CacheHint = Value
-
- val NONE,
- DISK_ONLY,
- DISK_ONLY_2,
- MEMORY_ONLY,
- MEMORY_ONLY_2,
- MEMORY_ONLY_SER,
- MEMORY_ONLY_SER_2,
- MEMORY_AND_DISK,
- MEMORY_AND_DISK_2,
- MEMORY_AND_DISK_SER,
- MEMORY_AND_DISK_SER_2 = Value
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
deleted file mode 100644
index 0007477..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.Matrix
-import org.apache.hadoop.io.Writable
-
-/**
- * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
- * therefore collected or saved.
- * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
- */
-trait CheckpointedDrm[K] extends DrmLike[K] {
-
- def rdd: DrmRdd[K]
-
- def collect: Matrix
-
- def writeDRM(path: String)
-
- /** If this checkpoint is already declared cached, uncache. */
- def uncache()
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
deleted file mode 100644
index 8216881..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.{SparseMatrix, DenseMatrix, Matrix, Vector}
-import math._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.SparkContext._
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
-
-class CheckpointedDrmBase[K: ClassTag](
- val rdd: DrmRdd[K],
- private var _nrow: Long = -1L,
- private var _ncol: Int = -1,
- private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
- private[sparkbindings] val partitioningTag: Long = Random.nextLong()
-
- ) extends CheckpointedDrm[K] {
-
-
- lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
- lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
-
- private var cached: Boolean = false
-
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] =
- // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
- this
-
- def cache() = {
- if (!cached) {
- rdd.persist(_cacheStorageLevel)
- cached = true
- }
- this
- }
-
-
- /**
- * if matrix was previously persisted into cache,
- * delete cached representation
- */
- def uncache() = {
- if (cached) {
- rdd.unpersist(blocking = false)
- cached = false
- }
- this
- }
-
- def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmBase[K] =
- new CheckpointedDrmBase[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
-
-
- /**
- * Collecting DRM to fron-end in-core Matrix.
- *
- * If key in DRM is Int, then matrix is collected using key as row index.
- * Otherwise, order of rows in result is undefined but key.toString is applied
- * as rowLabelBindings of the in-core matrix .
- *
- * Note that this pre-allocates target matrix and then assigns collected RDD to it
- * thus this likely would require about 2 times the RDD memory
- * @return
- */
- def collect: Matrix = {
-
- val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
-
- val cols = rdd.map(_._2.length).fold(0)(max(_, _))
- val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
-
- // since currently spark #collect() requires Serializeable support,
- // we serialize DRM vectors into byte arrays on backend and restore Vector
- // instances on the front end:
- val data = rdd.map(t => (t._1, t._2)).collect()
-
-
- val m = if (data.forall(_._2.isDense))
- new DenseMatrix(rows, cols)
-
- else
- new SparseMatrix(rows, cols)
-
- if (intRowIndices)
- data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
- else {
-
- // assign all rows sequentially
- val d = data.zipWithIndex
- d.foreach(t => m(t._2, ::) := t._1._2)
-
- // row bindings
- val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
-
- m.setRowLabelBindings(rowBindings)
- }
-
- m
- }
-
-
- /**
- * Dump matrix as computed Mahout's DRM into specified (HD)FS path
- * @param path
- */
- def writeDRM(path: String) = {
- val ktag = implicitly[ClassTag[K]]
-
- implicit val k2wFunc: (K) => Writable =
- if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
- else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
- else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
- else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
- else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
-// implicit def any2w(k: Any): Writable = k2wFunc(k)
- rdd.saveAsSequenceFile(path)
- }
-
- protected def computeNRow = {
-
- val intRowIndex = classTag[K] == classTag[Int]
-
- if (intRowIndex)
- cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
- else
- cache().rdd.count()
- }
-
- protected def computeNCol =
- cache().rdd.map(_._2.length).fold(-1)(max(_, _))
-
- protected def computeNNonZero =
- cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
new file mode 100644
index 0000000..2d80fe3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math._
+import math._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import scala.collection.JavaConversions._
+import org.apache.spark.storage.StorageLevel
+import reflect._
+import scala.util.Random
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.spark.SparkContext._
+
+/** Spark-specific optimizer-checkpointed DRM. */
+class CheckpointedDrmSpark[K: ClassTag](
+ val rdd: DrmRdd[K],
+ private var _nrow: Long = -1L,
+ private var _ncol: Int = -1,
+ private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ override protected[mahout] val partitioningTag: Long = Random.nextLong()
+ ) extends CheckpointedDrm[K] {
+
+ lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
+ lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
+
+ private var cached: Boolean = false
+ override protected[mahout] val context: DistributedContext = rdd.context
+
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+ // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
+ this
+ }
+
+ def cache() = {
+ if (!cached) {
+ rdd.persist(_cacheStorageLevel)
+ cached = true
+ }
+ this
+ }
+
+
+ /**
+ * if matrix was previously persisted into cache,
+ * delete cached representation
+ */
+ def uncache() = {
+ if (cached) {
+ rdd.unpersist(blocking = false)
+ cached = false
+ }
+ this
+ }
+
+// def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
+// new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
+
+
+ /**
+ * Collecting DRM to fron-end in-core Matrix.
+ *
+ * If key in DRM is Int, then matrix is collected using key as row index.
+ * Otherwise, order of rows in result is undefined but key.toString is applied
+ * as rowLabelBindings of the in-core matrix .
+ *
+ * Note that this pre-allocates target matrix and then assigns collected RDD to it
+ * thus this likely would require about 2 times the RDD memory
+ * @return
+ */
+ def collect: Matrix = {
+
+ val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
+
+ val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+ val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
+
+ // since currently spark #collect() requires Serializeable support,
+ // we serialize DRM vectors into byte arrays on backend and restore Vector
+ // instances on the front end:
+ val data = rdd.map(t => (t._1, t._2)).collect()
+
+
+ val m = if (data.forall(_._2.isDense))
+ new DenseMatrix(rows, cols)
+
+ else
+ new SparseMatrix(rows, cols)
+
+ if (intRowIndices)
+ data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
+ else {
+
+ // assign all rows sequentially
+ val d = data.zipWithIndex
+ d.foreach(t => m(t._2, ::) := t._1._2)
+
+ // row bindings
+ val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
+
+ m.setRowLabelBindings(rowBindings)
+ }
+
+ m
+ }
+
+
+ /**
+ * Dump matrix as computed Mahout's DRM into specified (HD)FS path
+ * @param path
+ */
+ def writeDRM(path: String) = {
+ val ktag = implicitly[ClassTag[K]]
+
+ implicit val k2wFunc: (K) => Writable =
+ if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
+ else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
+ else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
+ else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
+ else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
+
+ rdd.saveAsSequenceFile(path)
+ }
+
+ protected def computeNRow = {
+
+ val intRowIndex = classTag[K] == classTag[Int]
+
+ if (intRowIndex)
+ cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+ else
+ cache().rdd.count()
+ }
+
+ protected def computeNCol =
+ cache().rdd.map(_._2.length).fold(-1)(max(_, _))
+
+ protected def computeNNonZero =
+ cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
new file mode 100644
index 0000000..7cf6bd6
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -0,0 +1,16 @@
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+import scala.reflect.ClassTag
+
+/** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */
+class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+
+ assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
+
+ private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
+
+ /** Spark matrix customization exposure */
+ def rdd = sparkDrm.rdd
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
deleted file mode 100644
index f891c1e..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{DenseVector, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-
-
-/**
- * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
- * the DRMBase once they stabilize.
- *
- */
-class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
-
- /**
- * Reorganize every partition into a single in-core matrix
- * @return
- */
- def blockify(): BlockifiedDrmRdd[K] =
- org.apache.mahout.sparkbindings.drm.blockify(rdd = drm.rdd, blockncol = drm.ncol)
-
- /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
- def colSums(): Vector = {
- val n = drm.ncol
-
- drm.rdd
- // Throw away keys
- .map(_._2)
- // Fold() doesn't work with kryo still. So work around it.
- .mapPartitions(iter => {
- val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
- Iterator(acc)
- })
- // Since we preallocated new accumulator vector per partition, this must not cause any side
- // effects now.
- .reduce(_ += _)
-
- }
-
- def colMeans(): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
-}
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
deleted file mode 100644
index de1f9bd..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.spark.storage.StorageLevel
-
-/**
- *
- * Basic spark DRM trait.
- *
- * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
- * this package. Spark backing is already implied.
- *
- */
-trait DrmLike[K] {
-
- private[sparkbindings] def partitioningTag:Long
-
- /** R-like syntax for number of rows. */
- def nrow: Long
-
- /** R-like syntax for number of columns */
- def ncol: Int
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
deleted file mode 100644
index ce7b867..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io.Writable
-import org.apache.mahout.sparkbindings.drm.plan.{OpRowRange, OpMapBlock}
-import RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-/** Common Drm ops */
-class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
-
- /**
- * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
- * matrices; or they could be completely new matrices with new keyset. In the latter case, output
- * matrix width must be specified with <code>ncol</code> parameter.<P>
- *
- * New block heights must be of the same height as the original geometry.<P>
- *
- * @param ncol new matrix' width (only needed if width changes).
- * @param bmf
- * @tparam R
- * @return
- */
- def mapBlock[R : ClassTag](ncol: Int = -1)
- (bmf: BlockMapFunc[K, R]): DrmLike[R] =
- new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
-
-
- /**
- * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
- *
- * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
- *
- * Row range is currently unsupported except for the all-range. When it will be fully supported,
- * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
- *
- * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
- * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
- */
- def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
-
-
- val rowSrc: DrmLike[K] = if (rowRange != ::) {
-
- if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
-
- assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
- val intKeyed = drm.asInstanceOf[DrmLike[Int]]
-
- new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
-
- } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
-
- } else drm
-
- if (colRange != ::) {
-
- assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
-
- // Use mapBlock operator to do in-core subranging.
- rowSrc.mapBlock(ncol = colRange.length)({
- case (keys, block) => keys -> block(::, colRange)
- })
-
- } else rowSrc
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index 47cfa26..3801c77 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -20,6 +20,7 @@ package org.apache.mahout.sparkbindings.drm
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings._
/** Encapsulates either DrmRdd[K] or BlockifiedDrmRdd[K] */
class DrmRddInput[K: ClassTag](
[3/3] git commit: MAHOUT-1529 closes PR #1
Posted by dl...@apache.org.
MAHOUT-1529 closes PR #1
Squashed commit of the following:
commit e7b27280333fe12c94f3f5675c876f56e9e60728
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Tue May 27 11:01:13 2014 -0700
License, comments
commit bfca581389d91fb9c41db8c49e256ff694335fd1
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 16:07:01 2014 -0700
Fixing shell
commit c3b8aa4bf7d8939aa0800ce12b7f63e01f9686b9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 13:36:20 2014 -0700
- MahoutContext
commit a6461ac22a46793ab0bcdfcafe827d808f9e1810
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Fri May 23 12:13:40 2014 -0700
rebasing changes in MAHOUT-1529 branch on to new git master (applied thru selective patching, may, but hopefully, doesn't revert any commits in between in spark/ and math-scala/ modules
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8714a0f7
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8714a0f7
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8714a0f7
Branch: refs/heads/master
Commit: 8714a0f722663ea5cb16c14c5b8a01e57574cd93
Parents: 5a1c2cc
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Tue May 27 12:13:29 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Tue May 27 12:13:29 2014 -0700
----------------------------------------------------------------------
math-scala/pom.xml | 1 -
.../org/apache/mahout/math/drm/BCast.scala | 23 ++
.../org/apache/mahout/math/drm/CacheHint.scala | 19 ++
.../mahout/math/drm/CheckpointedDrm.scala | 36 +++
.../mahout/math/drm/CheckpointedOps.scala | 39 +++
.../mahout/math/drm/DistributedContext.scala | 27 +++
.../mahout/math/drm/DistributedEngine.scala | 158 ++++++++++++
.../org/apache/mahout/math/drm/DrmLike.scala | 48 ++++
.../org/apache/mahout/math/drm/DrmLikeOps.scala | 84 +++++++
.../apache/mahout/math/drm/RLikeDrmOps.scala | 92 +++++++
.../mahout/math/drm/decompositions/DQR.scala | 57 +++++
.../mahout/math/drm/decompositions/DSPCA.scala | 153 ++++++++++++
.../mahout/math/drm/decompositions/DSSVD.scala | 83 +++++++
.../math/drm/logical/AbstractBinaryOp.scala | 37 +++
.../math/drm/logical/AbstractUnaryOp.scala | 36 +++
.../math/drm/logical/CheckpointAction.scala | 47 ++++
.../apache/mahout/math/drm/logical/OpAB.scala | 41 ++++
.../mahout/math/drm/logical/OpABAnyKey.scala | 41 ++++
.../apache/mahout/math/drm/logical/OpABt.scala | 42 ++++
.../apache/mahout/math/drm/logical/OpAewB.scala | 39 +++
.../mahout/math/drm/logical/OpAewScalar.scala | 38 +++
.../apache/mahout/math/drm/logical/OpAt.scala | 33 +++
.../apache/mahout/math/drm/logical/OpAtA.scala | 36 +++
.../mahout/math/drm/logical/OpAtAnyKey.scala | 34 +++
.../apache/mahout/math/drm/logical/OpAtB.scala | 42 ++++
.../apache/mahout/math/drm/logical/OpAtx.scala | 41 ++++
.../apache/mahout/math/drm/logical/OpAx.scala | 42 ++++
.../mahout/math/drm/logical/OpMapBlock.scala | 41 ++++
.../mahout/math/drm/logical/OpRowRange.scala | 36 +++
.../math/drm/logical/OpTimesLeftMatrix.scala | 43 ++++
.../math/drm/logical/OpTimesRightMatrix.scala | 46 ++++
.../org/apache/mahout/math/drm/package.scala | 125 ++++++++++
.../apache/mahout/math/scalabindings/SSVD.scala | 165 -------------
.../scalabindings/decompositions/SSVD.scala | 165 +++++++++++++
.../mahout/math/scalabindings/package.scala | 1 +
.../sparkbindings/shell/MahoutSparkILoop.scala | 33 ++-
.../mahout/sparkbindings/shell/Main.scala | 17 ++
spark-shell/src/test/mahout/simple.mscala | 29 ++-
spark/pom.xml | 95 ++++----
.../sparkbindings/SparkDistributedContext.scala | 30 +++
.../mahout/sparkbindings/SparkEngine.scala | 240 +++++++++++++++++++
.../apache/mahout/sparkbindings/blas/ABt.scala | 5 +-
.../apache/mahout/sparkbindings/blas/AewB.scala | 2 +-
.../mahout/sparkbindings/blas/AinCoreB.scala | 15 +-
.../apache/mahout/sparkbindings/blas/At.scala | 2 +-
.../apache/mahout/sparkbindings/blas/AtA.scala | 4 +-
.../apache/mahout/sparkbindings/blas/AtB.scala | 3 +-
.../apache/mahout/sparkbindings/blas/Ax.scala | 19 +-
.../mahout/sparkbindings/blas/DrmRddOps.scala | 2 +-
.../mahout/sparkbindings/blas/MapBlock.scala | 43 ++++
.../mahout/sparkbindings/blas/Slicing.scala | 2 +-
.../mahout/sparkbindings/blas/package.scala | 1 -
.../sparkbindings/decompositions/DQR.scala | 56 -----
.../sparkbindings/decompositions/DSPCA.scala | 153 ------------
.../sparkbindings/decompositions/DSSVD.scala | 83 -------
.../mahout/sparkbindings/drm/CacheHint.scala | 20 --
.../sparkbindings/drm/CheckpointedDrm.scala | 39 ---
.../sparkbindings/drm/CheckpointedDrmBase.scala | 161 -------------
.../drm/CheckpointedDrmSpark.scala | 164 +++++++++++++
.../drm/CheckpointedDrmSparkOps.scala | 16 ++
.../sparkbindings/drm/CheckpointedOps.scala | 63 -----
.../mahout/sparkbindings/drm/DrmLike.scala | 46 ----
.../mahout/sparkbindings/drm/DrmLikeOps.scala | 85 -------
.../mahout/sparkbindings/drm/DrmRddInput.scala | 1 +
.../mahout/sparkbindings/drm/RLikeDrmOps.scala | 98 --------
.../mahout/sparkbindings/drm/SparkBCast.scala | 25 ++
.../mahout/sparkbindings/drm/package.scala | 226 ++---------------
.../drm/plan/AbstractBinaryOp.scala | 37 ---
.../drm/plan/AbstractUnaryOp.scala | 34 ---
.../drm/plan/CheckpointAction.scala | 207 ----------------
.../mahout/sparkbindings/drm/plan/OpAB.scala | 43 ----
.../sparkbindings/drm/plan/OpABAnyKey.scala | 41 ----
.../mahout/sparkbindings/drm/plan/OpABt.scala | 43 ----
.../mahout/sparkbindings/drm/plan/OpAewB.scala | 39 ---
.../sparkbindings/drm/plan/OpAewScalar.scala | 38 ---
.../mahout/sparkbindings/drm/plan/OpAt.scala | 34 ---
.../mahout/sparkbindings/drm/plan/OpAtA.scala | 37 ---
.../sparkbindings/drm/plan/OpAtAnyKey.scala | 34 ---
.../mahout/sparkbindings/drm/plan/OpAtB.scala | 42 ----
.../mahout/sparkbindings/drm/plan/OpAtx.scala | 42 ----
.../mahout/sparkbindings/drm/plan/OpAx.scala | 42 ----
.../sparkbindings/drm/plan/OpMapBlock.scala | 58 -----
.../sparkbindings/drm/plan/OpRowRange.scala | 37 ---
.../drm/plan/OpTimesLeftMatrix.scala | 44 ----
.../drm/plan/OpTimesRightMatrix.scala | 46 ----
.../mahout/sparkbindings/drm/plan/package.scala | 24 --
.../io/MahoutKryoRegistrator.scala | 6 +-
.../apache/mahout/sparkbindings/package.scala | 61 ++++-
.../mahout/sparkbindings/blas/ABtSuite.scala | 7 +-
.../mahout/sparkbindings/blas/AewBSuite.scala | 14 +-
.../mahout/sparkbindings/blas/AtASuite.scala | 4 +-
.../mahout/sparkbindings/blas/AtSuite.scala | 12 +-
.../decompositions/MathSuite.scala | 7 +-
.../sparkbindings/drm/DrmLikeOpsSuite.scala | 6 +-
.../mahout/sparkbindings/drm/DrmLikeSuite.scala | 6 +-
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 30 +--
.../sparkbindings/test/MahoutLocalContext.scala | 7 +-
97 files changed, 2607 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 4604b7d..95fe2c7 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -167,7 +167,6 @@
<artifactId>mahout-math</artifactId>
</dependency>
-
<!-- 3rd-party -->
<dependency>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
new file mode 100644
index 0000000..850614457
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+/** Broadcast variable abstraction */
+trait BCast[T] {
+ def value:T
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
new file mode 100644
index 0000000..ac763f9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -0,0 +1,19 @@
+package org.apache.mahout.math.drm
+
+object CacheHint extends Enumeration {
+
+ type CacheHint = Value
+
+ val NONE,
+ DISK_ONLY,
+ DISK_ONLY_2,
+ MEMORY_ONLY,
+ MEMORY_ONLY_2,
+ MEMORY_ONLY_SER,
+ MEMORY_ONLY_SER_2,
+ MEMORY_AND_DISK,
+ MEMORY_AND_DISK_2,
+ MEMORY_AND_DISK_SER,
+ MEMORY_AND_DISK_SER_2 = Value
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
new file mode 100644
index 0000000..0266944
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.math.Matrix
+
+/**
+ * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
+ * therefore collected or saved.
+ * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+ */
+trait CheckpointedDrm[K] extends DrmLike[K] {
+
+ def collect: Matrix
+
+ def writeDRM(path: String)
+
+ /** If this checkpoint is already declared cached, uncache. */
+ def uncache()
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
new file mode 100644
index 0000000..fa1ccfd
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+
+
+/**
+ * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
+ * the DRMBase once they stabilize.
+ *
+ */
+class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+
+
+ /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
+ def colSums(): Vector = drm.context.colSums(drm)
+
+ /** Column Means */
+ def colMeans(): Vector = drm.context.colMeans(drm)
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
new file mode 100644
index 0000000..39bab90
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import java.io.Closeable
+
+/** Distributed context (a.k.a. distributed session handle) */
+trait DistributedContext extends Closeable {
+
+ val engine:DistributedEngine
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
new file mode 100644
index 0000000..0e76d87
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import logical._
+import org.apache.mahout.math.{Matrix, Vector}
+import DistributedEngine._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+/** Abstraction of optimizer/distributed engine */
+trait DistributedEngine {
+
+ /**
+ * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
+ * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
+ * per se.
+ * <P>
+ *
+ * A particular physical engine implementation may choose to either use the default rewrites or
+ * build its own rewriting rules.
+ * <P>
+ */
+ def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
+
+ /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+ def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
+
+ /** Engine-specific colSums implementation based on a checkpoint. */
+ def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+ /** Engine-specific colMeans implementation based on a checkpoint. */
+ def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+ /** Broadcast support */
+ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
+
+ /** Broadcast support */
+ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
+
+ /** Load DRM from hdfs (as in Mahout DRM format) */
+ def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_]
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[String]
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+ /** Creates empty DRM with non-trivial height */
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Long]
+}
+
+object DistributedEngine {
+
+ /** This is mostly multiplication operations rewrites */
+ private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+
+ action match {
+ case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
+ case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+ // For now, rewrite left-multiply via transpositions, i.e.
+ // inCoreA %*% B = (B' %*% inCoreA')'
+ case op@OpTimesLeftMatrix(a, b) =>
+ OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass1(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass1(bop.A)(bop.classTagA)
+ bop.B = pass1(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+ /** This would remove stuff like A.t.t that previous step may have created */
+ private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+ action match {
+ // A.t.t => A
+ case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass2(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass2(bop.A)(bop.classTagA)
+ bop.B = pass2(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+ /** Some further rewrites that are conditioned on A.t.t removal */
+ private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+ action match {
+
+ // matrix products.
+ case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+
+ // AtB cases that make sense.
+ case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
+ case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+
+ // Need some cost to choose between the following.
+
+ case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+ // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
+ case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+ // Rewrite A'x
+ case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+
+ // Stop at checkpoints
+ case cd: CheckpointedDrm[_] => action
+
+ // For everything else we just pass-thru the operator arguments to optimizer
+ case uop: AbstractUnaryOp[_, K] =>
+ uop.A = pass3(uop.A)(uop.classTagA)
+ uop
+ case bop: AbstractBinaryOp[_, _, K] =>
+ bop.A = pass3(bop.A)(bop.classTagA)
+ bop.B = pass3(bop.B)(bop.classTagB)
+ bop
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
new file mode 100644
index 0000000..8e0db1e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+
+/**
+ *
+ * Basic spark DRM trait.
+ *
+ * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
+ * this package. Spark backing is already implied.
+ *
+ */
+trait DrmLike[K] {
+
+ protected[mahout] def partitioningTag:Long
+
+ protected[mahout] val context:DistributedContext
+
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long
+
+ /** R-like syntax for number of columns */
+ def ncol: Int
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
new file mode 100644
index 0000000..35e28af
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange}
+
+/** Common Drm ops */
+class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+ /**
+ * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
+ * matrices; or they could be completely new matrices with new keyset. In the latter case, output
+ * matrix width must be specified with <code>ncol</code> parameter.<P>
+ *
+ * New block heights must be of the same height as the original geometry.<P>
+ *
+ * @param ncol new matrix' width (only needed if width changes).
+ * @param bmf
+ * @tparam R
+ * @return
+ */
+ def mapBlock[R : ClassTag](ncol: Int = -1)
+ (bmf: BlockMapFunc[K, R]): DrmLike[R] =
+ new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+
+
+ /**
+ * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
+ *
+ * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
+ *
+ * Row range is currently unsupported except for the all-range. When it will be fully supported,
+ * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
+ *
+ * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
+ * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
+ */
+ def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
+
+ import RLikeDrmOps._
+ import RLikeOps._
+
+ val rowSrc: DrmLike[K] = if (rowRange != ::) {
+
+ if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+
+ assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
+ val intKeyed = drm.asInstanceOf[DrmLike[Int]]
+
+ new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
+
+ } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
+
+ } else drm
+
+ if (colRange != ::) {
+
+ assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
+
+ // Use mapBlock operator to do in-core subranging.
+ rowSrc.mapBlock(ncol = colRange.length)({
+ case (keys, block) => keys -> block(::, colRange)
+ })
+
+ } else rowSrc
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
new file mode 100644
index 0000000..f46d15c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Vector, Matrix}
+import org.apache.mahout.math.drm.logical._
+
+class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+
+ import RLikeDrmOps._
+
+ def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+
+ def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+
+ def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+
+ def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+
+ def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+ def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
+
+ def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
+
+ def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+ def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
+
+ def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
+
+ def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+
+ def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
+
+ def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+
+ def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
+
+ def %*%(that: Matrix): DrmLike[K] = this :%*% that
+
+ def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
+
+ def %*%(that: Vector): DrmLike[K] = :%*%(that)
+
+ def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+}
+
+class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
+
+ override def t: DrmLike[Int] = OpAt(A = drm)
+
+ def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
+
+ def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
+
+}
+
+object RLikeDrmOps {
+ implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
+
+ implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+
+ implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+
+ implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+
+ implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+
+ /**
+ * This is probably dangerous since it triggers implicit checkpointing with default storage level
+ * setting.
+ */
+ implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+ private val log = Logger.getLogger(DQR.getClass)
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+ if (A.ncol > 5000)
+ log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+ implicit val ctx = A.context
+
+ val AtA = (A.t %*% A).checkpoint()
+ val inCoreAtA = AtA.collect
+
+ if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+ val ch = chol(inCoreAtA)
+ val inCoreR = (ch.getL cloned) t
+
+ if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+ if (checkRankDeficiency && !ch.isPositiveDefinite)
+ throw new IllegalArgumentException("R is rank-deficient.")
+
+ val bcastAtA = drmBroadcast(inCoreAtA)
+
+ // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+ // decompose A'A in the backend again.
+
+ // Compute Q = A*inv(L') -- we can do it blockwise.
+ val Q = A.mapBlock() {
+ case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+ }
+
+ Q -> inCoreR
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+ implicit val ctx = A.context
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // Dataset mean
+ val xi = drmA.colMeans
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+ val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+ // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+ // memory beyond that is required to keep xi around, it still might be parallelized to backs
+ // for significantly big n and r. TODO
+ val s_o = omega.t %*% xi
+
+ val bcastS_o = drmBroadcast(s_o)
+ val bcastXi = drmBroadcast(xi)
+
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val s_o:Vector = bcastS_o
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+ keys -> blockY
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ var s_q = drmQ.colSums()
+ var bcastVarS_q = drmBroadcast(s_q)
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ var s_b = (drmBt.t %*% xi).collect(::, 0)
+ var bcastVarS_b = drmBroadcast(s_b)
+
+ for (i <- 0 until q) {
+
+ // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+ // attributes correctly. So we create additional set of vals for broadcast vars to properly
+ // create readonly closure attributes in this very scope.
+ val bcastS_q = bcastVarS_q
+ val bcastS_b = bcastVarS_b
+ val bcastXib = bcastXi
+
+ // Fix Bt as B' -= xi cross s_q
+ drmBt = drmBt.mapBlock() {
+ case (keys, block) =>
+ val s_q: Vector = bcastS_q
+ val xi: Vector = bcastXib
+ keys.zipWithIndex.foreach {
+ case (key, idx) => block(idx, ::) -= s_q * xi(key)
+ }
+ keys -> block
+ }
+
+ drmY.uncache()
+ drmQ.uncache()
+
+ drmY = (drmA %*% drmBt)
+ // Fix Y by subtracting s_b from each row of the AB'
+ .mapBlock() {
+ case (keys, block) =>
+ val s_b: Vector = bcastS_b
+ for (row <- 0 until block.nrow) block(row, ::) -= s_b
+ keys -> block
+ }
+ // Checkpoint Y
+ .checkpoint()
+
+ drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+ s_q = drmQ.colSums()
+ bcastVarS_q = drmBroadcast(s_q)
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = (drmA.t %*% drmQ).checkpoint()
+
+ s_b = (drmBt.t %*% xi).collect(::, 0)
+ bcastVarS_b = drmBroadcast(s_b)
+ }
+
+ val c = s_q cross s_b
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+ c - c.t + (s_q cross s_q) * (xi dot xi)
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = {
+
+ val drmA = A.checkpoint()
+
+ val m = drmA.nrow
+ val n = drmA.ncol
+ assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+ val pfxed = safeToNonNegInt((m min n) - k min p)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ // We represent Omega by its seed.
+ val omegaSeed = RandomUtils.getRandom().nextInt()
+
+ // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+ // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+ // is much more compact.
+ var drmY = drmA.mapBlock(ncol = r) {
+ case (keys, blockA) =>
+ val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+ keys -> blockY
+ }
+
+ var drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (q == 0) drmQ = drmQ.checkpoint()
+
+ // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+ // still be identically partitioned.
+ var drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (q == 0) drmBt = drmBt.checkpoint()
+
+ for (i <- 0 until q) {
+ drmY = drmA %*% drmBt
+ drmQ = dqrThin(drmY.checkpoint())._1
+ // Checkpoint Q if last iteration
+ if (i == q - 1) drmQ = drmQ.checkpoint()
+
+ // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+ // identically partitioned anymore.
+ drmBt = drmA.t %*% drmQ
+ // Checkpoint B' if last iteration
+ if (i == q - 1) drmBt = drmBt.checkpoint()
+ }
+
+ val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+ val (inCoreUHat, d) = eigen(inCoreBBt)
+ val s = d.sqrt
+
+ // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+ // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+ val drmU = drmQ %*% inCoreUHat
+ val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+ (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
new file mode 100644
index 0000000..c2371d1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
+ extends CheckpointAction[K] with DrmLike[K] {
+
+ protected[drm] var A: DrmLike[A]
+ protected[drm] var B: DrmLike[B]
+ protected[mahout] lazy val context: DistributedContext = A.context
+
+ // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
+ def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+ def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
+
+ def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
new file mode 100644
index 0000000..eb5ef9a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+/** Abstract unary operator */
+abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
+ extends CheckpointAction[K] with DrmLike[K] {
+
+ protected[drm] var A: DrmLike[A]
+
+ protected[mahout] lazy val context: DistributedContext = A.context
+
+ def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+ def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
new file mode 100644
index 0000000..aa3a3b9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import scala.util.Random
+import org.apache.mahout.math.drm._
+
+/** Implementation of distributed expression checkpoint and optimizer. */
+abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
+
+ protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+ private[mahout] var cp:Option[CheckpointedDrm[K]] = None
+
+ def isIdenticallyPartitioned(other:DrmLike[_]) =
+ partitioningTag!= 0L && partitioningTag == other.partitioningTag
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match {
+ case None =>
+ val physPlan = context.toPhysical(context.optimizerRewrite(this), cacheHint)
+ cp = Some(physPlan)
+ physPlan
+ case Some(cp) => cp
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
new file mode 100644
index 0000000..804a00e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpAB[K: ClassTag ](
+ override var A: DrmLike[K],
+ override var B: DrmLike[Int])
+ extends AbstractBinaryOp[K, Int, K] {
+
+ assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
new file mode 100644
index 0000000..f131f3f
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpABAnyKey[B:ClassTag, K: ClassTag ](
+ override var A: DrmLike[K],
+ override var B: DrmLike[B])
+ extends AbstractBinaryOp[K, B, K] {
+
+ assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
new file mode 100644
index 0000000..f6503ed
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical AB' */
+case class OpABt[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[Int])
+ extends AbstractBinaryOp[K,Int,K] {
+
+ assert(A.ncol == B.ncol, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(B.nrow)
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
new file mode 100644
index 0000000..d07172a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** DRM elementwise operator */
+case class OpAewB[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[K],
+ val op: Char
+ ) extends AbstractBinaryOp[K, K, K] {
+
+ assert(A.ncol == B.ncol, "arguments must have same number of columns")
+ assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
new file mode 100644
index 0000000..91e0dd4
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Operator denoting expressions like 5.0 - A or A * 5.6 */
+case class OpAewScalar[K: ClassTag](
+ override var A: DrmLike[K],
+ val scalar: Double,
+ val op: String
+ ) extends AbstractUnaryOp[K,K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
new file mode 100644
index 0000000..3239ad2
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm._
+
+/** Logical A' */
+case class OpAt(
+ override var A: DrmLike[Int])
+ extends AbstractUnaryOp[Int, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(A.nrow)
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
new file mode 100644
index 0000000..c7c6046
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** A'A */
+case class OpAtA[K: ClassTag](
+ override var A: DrmLike[K]
+ ) extends AbstractUnaryOp[K, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long = throw new UnsupportedOperationException
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
new file mode 100644
index 0000000..4e1dd5c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical A' for any row key to support A'A optimizations */
+case class OpAtAnyKey[A: ClassTag](
+ override var A: DrmLike[A])
+ extends AbstractUnaryOp[A, Int] {
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = safeToNonNegInt(A.nrow)
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
new file mode 100644
index 0000000..ef3ae6b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical A'B */
+case class OpAtB[A: ClassTag](
+ override var A: DrmLike[A],
+ override var B: DrmLike[A])
+ extends AbstractBinaryOp[A, A, Int] {
+
+ assert(A.nrow == B.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.ncol
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = B.ncol
+
+ /** Non-zero element count */
+ def nNonZero: Long =
+ // TODO: for purposes of cost calculation, approximate based on operands
+ throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
new file mode 100644
index 0000000..36769c7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+
+/** Logical A'x. */
+case class OpAtx(
+ override var A: DrmLike[Int],
+ val x: Vector
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.nrow == x.length, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = safeToNonNegInt(A.ncol)
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = 1
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
new file mode 100644
index 0000000..a726989
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Ax. */
+case class OpAx[K: ClassTag](
+ override var A: DrmLike[K],
+ val x: Vector
+ ) extends AbstractUnaryOp[K, K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.ncol == x.length, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = 1
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
new file mode 100644
index 0000000..8e4362d
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+
+class OpMapBlock[S: ClassTag, R: ClassTag](
+ override var A: DrmLike[S],
+ val bmf: BlockMapFunc[S, R],
+ val _ncol: Int = -1,
+ val _nrow: Long = -1
+ ) extends AbstractUnaryOp[S, R] {
+
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
new file mode 100644
index 0000000..697bbd3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical row-range slicing */
+case class OpRowRange(
+ override var A: DrmLike[Int],
+ val rowRange: Range
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = rowRange.length
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
new file mode 100644
index 0000000..1ca79b3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Times-left over in-core matrix operand */
+case class OpTimesLeftMatrix(
+ val left: Matrix,
+ override var A: DrmLike[Int]
+ ) extends AbstractUnaryOp[Int, Int] {
+
+ assert(left.ncol == A.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = left.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+ /** Non-zero element count */
+ // TODO
+ def nNonZero: Long = throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
new file mode 100644
index 0000000..c55f7f0
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical times-right over in-core matrix operand. */
+case class OpTimesRightMatrix[K: ClassTag](
+ override var A: DrmLike[K],
+ val right: Matrix
+ ) extends AbstractUnaryOp[K, K] {
+
+ override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+ assert(A.ncol == right.nrow, "Incompatible operand geometry")
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = right.ncol
+
+ /** Non-zero element count */
+ // TODO
+ def nNonZero: Long = throw new UnsupportedOperationException
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
new file mode 100644
index 0000000..768bb1c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+
+package object drm {
+
+ /** Drm row-wise tuple */
+ type DrmTuple[K] = (K, Vector)
+
+ /** Drm block-wise tuple: Array of row keys and the matrix block. */
+ type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
+
+
+ /** Block-map func */
+ type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
+
+ /** CacheHint type */
+ // type CacheHint = CacheHint.CacheHint
+
+ def safeToNonNegInt(x: Long): Int = {
+ assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
+ x.toInt
+ }
+
+ /** Broadcast support API */
+ def drmBroadcast(m:Matrix)(implicit ctx:DistributedContext):BCast[Matrix] = ctx.drmBroadcast(m)
+
+ /** Broadcast support API */
+ def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
+
+ /** Load DRM from hdfs (as in Mahout DRM format) */
+ def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path)
+
+ /** Shortcut to parallelizing matrices with indices, ignore row labels. */
+ def drmParallelize(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int] = drmParallelizeWithRowIndices(m, numPartitions)(sc)
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeWithRowIndices(m, numPartitions)
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit ctx: DistributedContext): CheckpointedDrm[String] = ctx.drmParallelizeWithRowLabels(m, numPartitions)
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeEmpty(nrow, ncol, numPartitions)
+
+ /** Creates empty DRM with non-trivial height */
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
+
+ /** Implicit broadcast -> value conversion. */
+ implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+
+ /** Just throw all engine operations into context as well. */
+ implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+
+ implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
+ new CheckpointedOps[K](drm)
+
+ implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+ // ============== Decompositions ===================
+
+ /**
+ * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+ * controlled (<5000 or so). <P>
+ *
+ * It is recommended to checkpoint A since it does two passes over it. <P>
+ *
+ * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+ * their RDD should be able to zip successfully.
+ */
+ def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+ DQR.dqrThin(A, checkRankDeficiency)
+
+ /**
+ * Distributed Stochastic Singular Value decomposition algorithm.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+ /**
+ * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+ * document of the MAHOUT-817.
+ *
+ * @param A input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations (hint: use either 0 or 1)
+ * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+ * e.g. save them to hdfs in order to trigger their computation.
+ */
+ def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+ (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+
+}