You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/05/27 21:14:51 UTC

[1/3] MAHOUT-1529 closes PR #1

Repository: mahout
Updated Branches:
  refs/heads/master 5a1c2ccba -> 8714a0f72


http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
deleted file mode 100644
index 1b1bcec..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOps.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.plan._
-import org.apache.mahout.math.{Matrices, SparseColumnMatrix, Vector, Matrix}
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesLeftMatrix
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
-import org.apache.mahout.sparkbindings.drm.plan.OpAB
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.hadoop.io.Writable
-import org.apache.spark.SparkContext._
-
-class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
-
-  import RLikeDrmOps._
-
-  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
-
-  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
-
-  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
-
-  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
-
-  def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
-
-  def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
-
-  def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
-
-  def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
-
-  def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
-
-  def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
-
-  def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
-
-  def %*%[B:ClassTag](that:DrmLike[B]):DrmLike[K] = OpABAnyKey[B,K](A=this.drm, B=that)
-
-  def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
-
-  def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
-
-  def %*%(that: Matrix): DrmLike[K] = this :%*% that
-
-  def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
-
-  def %*%(that: Vector): DrmLike[K] = :%*%(that)
-
-  def t: DrmLike[Int] = OpAtAnyKey(A = drm)
-}
-
-class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
-
-  override def t: DrmLike[Int] = OpAt(A = drm)
-
-  def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
-
-  def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
-
-
-}
-
-object RLikeDrmOps {
-  implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
-
-  implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
-
-  implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
-
-  implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
-
-  implicit def cp2cpops[K:ClassTag](cp:CheckpointedDrm[K]):CheckpointedOps[K] = new CheckpointedOps(cp)
-
-  /**
-   * This is probably dangerous since it triggers implicit checkpointing with default storage level
-   * setting.
-   */
-  implicit def drm2cpops[K:ClassTag](drm:DrmLike[K]):CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
new file mode 100644
index 0000000..ac36f60
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/SparkBCast.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.BCast
+import org.apache.spark.broadcast.Broadcast
+
+class SparkBCast[T](val sbcast: Broadcast[T]) extends BCast[T] with Serializable {
+  def value: T = sbcast.value
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 6f774ba..322d67c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -27,188 +27,32 @@ import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import SparkContext._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.mahout.sparkbindings.drm.decompositions.{DSPCA, DSSVD, DQR}
+import org.apache.mahout.math.drm._
+import SparkContext._
 
 
 package object drm {
 
   private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings");
 
-  /** Drm row-wise tuple */
-  type DrmTuple[K] = (K, Vector)
-
-  /** Row-wise organized DRM rdd type */
-  type DrmRdd[K] = RDD[DrmTuple[K]]
-
-  /** Drm block-wise tuple: Array of row keys and the matrix block. */
-  type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
-
-  /**
-   * Blockifed DRM rdd (keys of original DRM are grouped into array corresponding to rows of Matrix
-   * object value
-   */
-  type BlockifiedDrmRdd[K] = RDD[BlockifiedDrmTuple[K]]
-
-  /** Block-map func */
-  type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
-
-  /** CacheHint type */
-//  type CacheHint = CacheHint.CacheHint
+  private[sparkbindings] implicit def input2drmRdd[K](input: DrmRddInput[K]): DrmRdd[K] = input.toDrmRdd()
 
-  implicit def input2drmRdd[K](input: DrmRddInput[K]): DrmRdd[K] = input.toDrmRdd()
+  private[sparkbindings] implicit def input2blockifiedDrmRdd[K](input: DrmRddInput[K]): BlockifiedDrmRdd[K] = input.toBlockifiedDrmRdd()
 
-  implicit def input2blockifiedDrmRdd[K](input: DrmRddInput[K]): BlockifiedDrmRdd[K] = input.toBlockifiedDrmRdd()
-
-  implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
     new DrmRddInput(rowWiseSrc = Some(cp.ncol -> cp.rdd))
 
-  implicit def drm2drmOps[K <% Writable : ClassTag](drm: CheckpointedDrmBase[K]): CheckpointedOps[K] =
-    new CheckpointedOps[K](drm)
-
-  implicit def v2Writable(v: Vector): VectorWritable = new VectorWritable(v)
-
-  implicit def m2Writable(m: Matrix): MatrixWritable = new MatrixWritable(m)
-
-  implicit def vw2v(vw: VectorWritable): Vector = vw.get()
-
-  implicit def mw2m(mw: MatrixWritable): Matrix = mw.get()
-
-  implicit def drmLike2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
-
-  implicit def bcast2Matrix(bcast: Broadcast[_ <: Matrix]): Matrix = bcast.value
+//  /** Broadcast vector (Mahout vectors are not closure-friendly, use this instead. */
+//  private[sparkbindings] def drmBroadcast(x: Vector)(implicit sc: SparkContext): Broadcast[Vector] = sc.broadcast(x)
+//
+//  /** Broadcast in-core Mahout matrix. Use this instead of closure. */
+//  private[sparkbindings] def drmBroadcast(m: Matrix)(implicit sc: SparkContext): Broadcast[Matrix] = sc.broadcast(m)
 
-  implicit def bcast2Vector(bcast: Broadcast[_ <: Vector]): Vector = bcast.value
+  /** Implicit broadcast cast for Spark physical op implementations. */
+  private[sparkbindings] implicit def bcast2val[K](bcast:Broadcast[K]):K = bcast.value
 
-
-  /**
-   * Load DRM from hdfs (as in Mahout DRM format)
-   *
-   * @param path
-   * @param sc spark context (wanted to make that implicit, doesn't work in current version of
-   *           scala with the type bounds, sorry)
-   *
-   * @return DRM[Any] where Any is automatically translated to value type
-   */
-  def drmFromHDFS (path: String)(implicit sc: SparkContext): CheckpointedDrmBase[_] = {
-    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
-
-    val key = rdd.map(_._1).take(1)(0)
-    val keyWClass = key.getClass.asSubclass(classOf[Writable])
-
-    val key2val = key match {
-      case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
-      case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
-      case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
-      case xx: Writable => (v: AnyRef) => v
-    }
-
-    val val2key = key match {
-      case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
-      case xx: Text => (x: Any) => new Text(x.toString)
-      case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
-      case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
-    }
-
-    val  km = key match {
-      case xx: IntWritable => implicitly[ClassTag[Int]]
-      case xx: Text => implicitly[ClassTag[String]]
-      case xx: LongWritable => implicitly[ClassTag[Long]]
-      case xx: Writable => ClassTag(classOf[Writable])
-    }
-
-    {
-      implicit def getWritable(x: Any): Writable = val2key()
-      new CheckpointedDrmBase(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
-    }
-  }
-
-  /** Shortcut to parallelizing matrices with indices, ignore row labels. */
-  def drmParallelize(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: SparkContext) =
-    drmParallelizeWithRowIndices(m, numPartitions)(sc)
-
-  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
-  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: SparkContext)
-  : CheckpointedDrm[Int] = {
-
-    new CheckpointedDrmBase(parallelizeInCore(m, numPartitions))
-  }
-
-  private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: SparkContext): DrmRdd[Int] = {
-
-    val p = (0 until m.nrow).map(i => i -> m(i, ::))
-    sc.parallelize(p, numPartitions)
-
-  }
-
-  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
-  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: SparkContext)
-  : CheckpointedDrmBase[String] = {
-
-
-    // In spark 0.8, I have patched ability to parallelize kryo objects directly, so no need to
-    // wrap that into byte array anymore
-    val rb = m.getRowLabelBindings
-    val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
-
-
-    new CheckpointedDrmBase(sc.parallelize(p, numPartitions))
-  }
-
-  /** This creates an empty DRM with specified number of partitions and cardinality. */
-  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: SparkContext): CheckpointedDrm[Int] = {
-    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
-      val partNRow = (nrow - 1) / numPartitions + 1
-      val partStart = partNRow * part
-      val partEnd = Math.min(partStart + partNRow, nrow)
-
-      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
-    })
-    new CheckpointedDrmBase[Int](rdd, nrow, ncol)
-  }
-
-  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: SparkContext): CheckpointedDrmBase[Long] = {
-    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
-      val partNRow = (nrow - 1) / numPartitions + 1
-      val partStart = partNRow * part
-      val partEnd = Math.min(partStart + partNRow, nrow)
-
-      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
-    })
-    new CheckpointedDrmBase[Long](rdd, nrow, ncol)
-  }
-
-  def drmWrap[K : ClassTag](
-      rdd: DrmRdd[K],
-      nrow: Int = -1,
-      ncol: Int = -1
-      ): CheckpointedDrm[K] =
-    new CheckpointedDrmBase[K](
-      rdd = rdd,
-      _nrow = nrow,
-      _ncol = ncol
-    )
-
-
-  /** Broadcast vector (Mahout vectors are not closure-friendly, use this instead. */
-  def drmBroadcast(x: Vector)(implicit sc: SparkContext): Broadcast[Vector] = sc.broadcast(x)
-
-  /** Broadcast in-core Mahout matrix. Use this instead of closure. */
-  def drmBroadcast(m: Matrix)(implicit sc: SparkContext): Broadcast[Matrix] = sc.broadcast(m)
-
-  def safeToNonNegInt(x: Long): Int = {
-    assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
-    x.toInt
-  }
-
-  def blockify[K: ClassTag](rdd: DrmRdd[K], blockncol: Int): BlockifiedDrmRdd[K] = {
+  private[sparkbindings] def blockify[K: ClassTag](rdd: DrmRdd[K], blockncol: Int): BlockifiedDrmRdd[K] = {
 
     rdd.mapPartitions(iter => {
 
@@ -226,7 +70,7 @@ package object drm {
     })
   }
 
-  def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
+  private[sparkbindings] def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
 
   // Just flat-map rows, connect with the keys
     rdd.flatMap({
@@ -246,46 +90,4 @@ package object drm {
 
     })
 
-  // ============== Decompositions ===================
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
-    DQR.dqrThin(A, checkRankDeficiency)
-
-  /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
-
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
deleted file mode 100644
index 6498d87..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractBinaryOp.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import scala.util.Random
-
-abstract class AbstractBinaryOp[A : ClassTag, B : ClassTag, K : ClassTag]
-    extends CheckpointAction[K] with DrmLike[K] {
-
-  protected[plan] var A: DrmLike[A]
-  protected[plan] var B: DrmLike[B]
-
-  // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
-  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
-  def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
-
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
deleted file mode 100644
index a331345..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/AbstractUnaryOp.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Abstract unary operator */
-abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
-    extends CheckpointAction[K] with DrmLike[K] {
-
-  protected[plan] var A: DrmLike[A]
-
-  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
deleted file mode 100644
index cd935bc..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.blas._
-import org.apache.mahout.sparkbindings.drm._
-import CheckpointAction._
-import org.apache.spark.SparkContext._
-import org.apache.hadoop.io.Writable
-import org.apache.spark.storage.StorageLevel
-import scala.util.Random
-
-/** Implementation of distributed expression checkpoint and optimizer. */
-abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
-
-  private[sparkbindings] lazy val partitioningTag: Long = Random.nextLong()
-
-  private var cp:Option[CheckpointedDrm[K]] = None
-
-
-  def isIdenticallyPartitioned(other:DrmLike[_]) =
-    partitioningTag!= 0L && partitioningTag == other.partitioningTag
-
-  /**
-   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
-   * and writing down Spark graph lineage since last checkpointed DRM.
-   */
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp.getOrElse({
-    // Non-zero count is sparsely supported by logical operators now. So assume we have no knowledge
-    // if it is unsupported, instead of failing.
-    val plan = optimize(this)
-    val rdd = exec(plan)
-    val newcp = new CheckpointedDrmBase(
-      rdd = rdd,
-      _nrow = nrow,
-      _ncol = ncol,
-      _cacheStorageLevel = cacheHint2Spark(cacheHint),
-      partitioningTag = plan.partitioningTag
-    )
-    cp = Some(newcp)
-    newcp.cache()
-  })
-
-}
-
-object CheckpointAction {
-
-  /** Perform expression optimization. Return physical plan that we can pass to exec() */
-  def optimize[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
-
-  private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
-    case CacheHint.NONE => StorageLevel.NONE
-    case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
-    case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
-    case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
-    case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
-    case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
-    case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
-    case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
-    case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
-    case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
-    case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
-  }
-
-
-  /** This is mostly multiplication operations rewrites */
-  private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
-
-    action match {
-      case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
-      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
-
-      // For now, rewrite left-multiply via transpositions, i.e.
-      // inCoreA %*% B = (B' %*% inCoreA')'
-      case op@OpTimesLeftMatrix(a, b) =>
-        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
-
-      // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
-
-      // For everything else we just pass-thru the operator arguments to optimizer
-      case uop: AbstractUnaryOp[_, K] =>
-        uop.A = pass1(uop.A)(uop.classTagA)
-        uop
-      case bop: AbstractBinaryOp[_, _, K] =>
-        bop.A = pass1(bop.A)(bop.classTagA)
-        bop.B = pass1(bop.B)(bop.classTagB)
-        bop
-    }
-  }
-
-  /** This would remove stuff like A.t.t that previous step may have created */
-  private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
-    action match {
-      // A.t.t => A
-      case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
-
-      // A.t.t => A
-//      case OpAt(top@OpAtAnyKey(a)) => pass2(a)(top.classTagA)
-
-
-      // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
-
-      // For everything else we just pass-thru the operator arguments to optimizer
-      case uop: AbstractUnaryOp[_, K] =>
-        uop.A = pass2(uop.A)(uop.classTagA)
-        uop
-      case bop: AbstractBinaryOp[_, _, K] =>
-        bop.A = pass2(bop.A)(bop.classTagA)
-        bop.B = pass2(bop.B)(bop.classTagB)
-        bop
-    }
-  }
-
- /** Some further rewrites that are conditioned on A.t.t removal */
-  private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
-    action match {
-
-      // matrix products.
-      case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
-
-      // AtB cases that make sense.
-      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
-      case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
-
-      // Need some cost to choose between the following.
-
-      case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
-      //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
-      case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
-      // Rewrite A'x
-      case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
-
-      // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
-
-      // For everything else we just pass-thru the operator arguments to optimizer
-      case uop: AbstractUnaryOp[_, K] =>
-        uop.A = pass3(uop.A)(uop.classTagA)
-        uop
-      case bop: AbstractBinaryOp[_, _, K] =>
-        bop.A = pass3(bop.A)(bop.classTagA)
-        bop.B = pass3(bop.B)(bop.classTagB)
-        bop
-    }
-  }
-
-
-  /** Execute previously optimized physical plan */
-  def exec[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
-    // I do explicit evidence propagation here since matching via case classes seems to be loosing
-    // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
-    // Hence you see explicit evidence attached to all recursive exec() calls.
-    oper match {
-      // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
-      // the A'A case but actual transposition intent which should be removed from consideration
-      // (we cannot do actual flip for non-int-keyed arguments)
-      case OpAtAnyKey(_) =>
-        throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
-      case op@OpAt(a) => At.at(op, exec(a)(op.classTagA))
-      case op@OpABt(a, b) => ABt.abt(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
-      case op@OpAtB(a, b) => AtB.atb_nograph(op, exec(a)(op.classTagA), exec(b)(op.classTagB),
-        zippable = a.partitioningTag == b.partitioningTag)
-      case op@OpAtA(a) => AtA.at_a(op, exec(a)(op.classTagA))
-      case op@OpAx(a, x) => Ax.ax_with_broadcast(op, exec(a)(op.classTagA))
-      case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, exec(a)(op.classTagA))
-      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
-      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
-      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
-      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, exec(a)(op.classTagA), exec(b)(op.classTagB))
-      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, exec(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, exec(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, exec(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, exec(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, exec(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, exec(a)(op.classTagA), s)
-      case op@OpRowRange(a, _) => Slicing.rowRange(op, exec(a)(op.classTagA))
-      case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, exec(a)(op.classTagA))
-      // Custom operators, we just execute them
-      case blockOp: OpMapBlock[K, _] => blockOp.exec(src = exec(blockOp.A)(blockOp.classTagA))
-      case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
-      case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
-          .format(oper))
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
deleted file mode 100644
index 4911eb2..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAB.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.hadoop.io.Writable
-import org.apache.spark.SparkContext._
-
-/** Logical AB */
-case class OpAB[K: ClassTag ](
-    override var A: DrmLike[K],
-    override var B: DrmLike[Int])
-    extends AbstractBinaryOp[K, Int, K] {
-
-  assert(A.ncol == B.nrow, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = B.ncol
-
-  /** Non-zero element count */
-  def nNonZero: Long =
-  // TODO: for purposes of cost calculation, approximate based on operands
-    throw new UnsupportedOperationException
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
deleted file mode 100644
index 30d3f8c..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABAnyKey.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical AB */
-case class OpABAnyKey[B:ClassTag, K: ClassTag ](
-    override var A: DrmLike[K],
-    override var B: DrmLike[B])
-    extends AbstractBinaryOp[K, B, K] {
-
-  assert(A.ncol == B.nrow, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = B.ncol
-
-  /** Non-zero element count */
-  def nNonZero: Long =
-  // TODO: for purposes of cost calculation, approximate based on operands
-    throw new UnsupportedOperationException
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
deleted file mode 100644
index c1bc21d..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpABt.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
-
-/** Logical AB' */
-case class OpABt[K: ClassTag](
-    override var A: DrmLike[K],
-    override var B: DrmLike[Int])
-    extends AbstractBinaryOp[K,Int,K]  {
-
-  assert(A.ncol == B.ncol, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = safeToNonNegInt(B.nrow)
-
-  /** Non-zero element count */
-  def nNonZero: Long =
-  // TODO: for purposes of cost calculation, approximate based on operands
-    throw new UnsupportedOperationException
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
deleted file mode 100644
index a20e1af..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewB.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** DRM elementwise operator */
-case class OpAewB[K: ClassTag](
-    override var A: DrmLike[K],
-    override var B: DrmLike[K],
-    val op: Char
-    ) extends AbstractBinaryOp[K, K, K] {
-
-  assert(A.ncol == B.ncol, "arguments must have same number of columns")
-  assert(A.nrow == B.nrow, "arguments must have same number of rows")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = A.ncol
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
deleted file mode 100644
index b35d737..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAewScalar.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Operator denoting expressions like 5.0 - A or A * 5.6 */
-case class OpAewScalar[K: ClassTag](
-    override var A: DrmLike[K],
-    val scalar: Double,
-    val op: String
-    ) extends AbstractUnaryOp[K,K] {
-
-  override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = A.ncol
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
deleted file mode 100644
index 1c212e1..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAt.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A' */
-case class OpAt(
-    override var A: DrmLike[Int])
-    extends AbstractUnaryOp[Int, Int] {
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.ncol
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = safeToNonNegInt(A.nrow)
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
deleted file mode 100644
index 2e3aa4c..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtA.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** A'A */
-case class OpAtA[K: ClassTag](
-    override var A: DrmLike[K]
-    ) extends AbstractUnaryOp[K, Int] {
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.ncol
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = A.ncol
-
-  /** Non-zero element count */
-  def nNonZero: Long = throw new UnsupportedOperationException
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
deleted file mode 100644
index 98e6218..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtAnyKey.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A' for any row key to support A'A optimizations */
-case class OpAtAnyKey[A:ClassTag](
-    override var A: DrmLike[A])
-    extends AbstractUnaryOp[A, Int] {
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.ncol
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = safeToNonNegInt(A.nrow)
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
deleted file mode 100644
index 6e830f8..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtB.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm._
-import scala.reflect.ClassTag
-
-/** Logical A'B */
-case class OpAtB[A:ClassTag](
-    override var A: DrmLike[A],
-    override var B: DrmLike[A])
-    extends AbstractBinaryOp[A,A,Int]  {
-
-  assert(A.nrow == B.nrow, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.ncol
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = B.ncol
-
-  /** Non-zero element count */
-  def nNonZero: Long =
-  // TODO: for purposes of cost calculation, approximate based on operands
-    throw new UnsupportedOperationException
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
deleted file mode 100644
index edc00f0..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAtx.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-
-/** Logical A'x. */
-case class OpAtx(
-    override var A: DrmLike[Int],
-    val x: Vector
-    ) extends AbstractUnaryOp[Int, Int] {
-
-  override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
-  assert(A.nrow == x.length, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = safeToNonNegInt(A.ncol)
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = 1
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
deleted file mode 100644
index 7dce249..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpAx.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Vector, Matrix}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical Ax. */
-case class OpAx[K: ClassTag](
-    override var A: DrmLike[K],
-    val x: Vector
-    ) extends AbstractUnaryOp[K, K] {
-
-  override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
-  assert(A.ncol == x.length, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = 1
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
deleted file mode 100644
index a0c882d..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpMapBlock.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.{DrmRddInput, BlockMapFunc, DrmLike}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-class OpMapBlock[S: ClassTag, R: ClassTag](
-    override var A: DrmLike[S],
-    val bmf: BlockMapFunc[S, R],
-    val _ncol: Int = -1,
-    val _nrow: Long = -1
-    ) extends AbstractUnaryOp[S, R] {
-
-
-  override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
-
-  def exec(src: DrmRddInput[S]): DrmRddInput[R] = {
-
-    // We can't use attributes to avoid putting the whole this into closure.
-    val bmf = this.bmf
-    val ncol = this.ncol
-
-    val rdd = src.toBlockifiedDrmRdd()
-        .map(blockTuple => {
-      val out = bmf(blockTuple)
-
-      assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
-      assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
-
-      out
-    })
-    new DrmRddInput(blockifiedSrc = Some(rdd))
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
deleted file mode 100644
index 316661b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpRowRange.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** Logical row-range slicing */
-case class OpRowRange(
-  override var A:DrmLike[Int],
-  val rowRange:Range
-    ) extends AbstractUnaryOp[Int, Int] {
-
-  assert(rowRange.head>=0 && rowRange.last< A.nrow, "row range out of range")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = rowRange.length
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = A.ncol
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
deleted file mode 100644
index 94a4d0b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesLeftMatrix.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-import org.apache.spark.SparkContext._
-
-/** Logical Times-left over in-core matrix operand */
-case class OpTimesLeftMatrix(
-    val left: Matrix,
-    override var A: DrmLike[Int]
-    ) extends AbstractUnaryOp[Int, Int] {
-
-  assert(left.ncol == A.nrow, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = left.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = A.ncol
-
-  /** Non-zero element count */
-  // TODO
-  def nNonZero: Long = throw new UnsupportedOperationException
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
deleted file mode 100644
index 007672b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/OpTimesRightMatrix.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.plan
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.DrmLike
-
-/** Logical times-right over in-core matrix operand. */
-case class OpTimesRightMatrix[K: ClassTag](
-    override var A: DrmLike[K],
-    val right: Matrix
-    ) extends AbstractUnaryOp[K, K] {
-
-  override private[sparkbindings] lazy val partitioningTag: Long = A.partitioningTag
-
-  assert(A.ncol == right.nrow, "Incompatible operand geometry")
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long = A.nrow
-
-  /** R-like syntax for number of columns */
-  def ncol: Int = right.ncol
-
-  /** Non-zero element count */
-  // TODO
-  def nNonZero: Long = throw new UnsupportedOperationException
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
deleted file mode 100644
index 6fb03f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-
-/** This package contains logical distributed expression operators. */
-package object plan {
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 14fa200..b0042c9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -20,10 +20,12 @@ package org.apache.mahout.sparkbindings.io
 import com.esotericsoftware.kryo.Kryo
 import org.apache.mahout.math._
 import org.apache.spark.serializer.KryoRegistrator
-import org.apache.mahout.sparkbindings.drm._
-
+import org.apache.mahout.sparkbindings._
 
 
+/**
+ *
+ */
 class MahoutKryoRegistrator extends KryoRegistrator {
 
   override def registerClasses(kryo: Kryo) = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 3010442..34d81cf 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -17,20 +17,33 @@
 
 package org.apache.mahout
 
-import org.apache.spark.rdd.RDD
-import org.apache.mahout.math.{Matrix, Vector}
-import scala.reflect.ClassTag
 import org.apache.spark.{SparkConf, SparkContext}
 import java.io._
 import scala.collection.mutable.ArrayBuffer
 import org.apache.mahout.common.IOUtils
 import org.apache.log4j.Logger
-import org.apache.mahout.sparkbindings.drm.DrmLike
+import org.apache.mahout.math.drm._
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.{SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.broadcast.Broadcast
+import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
+import org.apache.hadoop.io.Writable
 
+/** Public api for Spark-specific operators */
 package object sparkbindings {
 
   private[sparkbindings] val log = Logger.getLogger("org.apache.mahout.sparkbindings")
 
+  /** Row-wise organized DRM rdd type */
+  type DrmRdd[K] = RDD[DrmTuple[K]]
+
+  /**
+   * Blockifed DRM rdd (keys of original DRM are grouped into array corresponding to rows of Matrix
+   * object value
+   */
+  type BlockifiedDrmRdd[K] = RDD[BlockifiedDrmTuple[K]]
+
   /**
    * Create proper spark context that includes local Mahout jars
    * @param masterUrl
@@ -42,7 +55,7 @@ package object sparkbindings {
       customJars: TraversableOnce[String] = Nil,
       sparkConf: SparkConf = new SparkConf(),
       addMahoutJars: Boolean = true
-      ): SparkContext = {
+      ): SparkDistributedContext = {
     val closeables = new java.util.ArrayDeque[Closeable]()
 
     try {
@@ -125,13 +138,49 @@ package object sparkbindings {
         sparkConf.setSparkHome(System.getenv("SPARK_HOME"))
       }
 
-      new SparkContext(config = sparkConf)
+      new SparkDistributedContext(new SparkContext(config = sparkConf))
 
     } finally {
       IOUtils.close(closeables)
     }
+  }
+
+  implicit def sdc2sc(sdc: SparkDistributedContext): SparkContext = sdc.sc
 
+  implicit def sc2sdc(sc: SparkContext): SparkDistributedContext = new SparkDistributedContext(sc)
+
+  implicit def dc2sc(dc:DistributedContext):SparkContext = {
+    assert (dc.isInstanceOf[SparkDistributedContext],"distributed context must be Spark-specific.")
+    sdc2sc(dc.asInstanceOf[SparkDistributedContext])
   }
 
+  /** Broadcast transforms */
+  implicit def sb2bc[T](b:Broadcast[T]):BCast[T] = new SparkBCast(b)
+
+  /** Adding Spark-specific ops */
+  implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] =
+    new CheckpointedDrmSparkOps[K](drm)
+
+  implicit def drm2cpDrmSparkOps[K:ClassTag](drm:DrmLike[K]):CheckpointedDrmSparkOps[K] = drm:CheckpointedDrm[K]
+
+  private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new MatrixWritable(m)
+
+  private[sparkbindings] implicit def w2m(w: MatrixWritable): Matrix = w.get()
+
+  private[sparkbindings] implicit def v2w(v: Vector): VectorWritable = new VectorWritable(v)
+
+  private[sparkbindings] implicit def w2v(w:VectorWritable):Vector = w.get()
+
+  def drmWrap[K : ClassTag](
+      rdd: DrmRdd[K],
+      nrow: Int = -1,
+      ncol: Int = -1
+      ): CheckpointedDrm[K] =
+    new CheckpointedDrmSpark[K](
+      rdd = rdd,
+      _nrow = nrow,
+      _ncol = ncol
+    )
+
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 3e4222a..0834145 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -20,11 +20,12 @@ package org.apache.mahout.sparkbindings.blas
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 import org.scalatest.FunSuite
 import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings._
-import drm._
+import org.apache.mahout.sparkbindings.drm._
 import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
 import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
 
 /** Tests for AB' operator algorithms */
 class ABtSuite extends FunSuite with MahoutLocalContext {
@@ -37,7 +38,7 @@ class ABtSuite extends FunSuite with MahoutLocalContext {
 
     val op = new OpABt(A, B)
 
-    val drm = new CheckpointedDrmBase(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreMControl = inCoreA %*% inCoreB.t
     val inCoreM = drm.collect

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 3065737..2efa5ff 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -21,9 +21,11 @@ import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAewB
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
 import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpAewB
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 
 /** Elementwise matrix operation tests */
 class AewBSuite extends FunSuite with MahoutLocalContext {
@@ -36,7 +38,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
 
     val op = new OpAewB(A, B, '*')
 
-    val M = new CheckpointedDrmBase(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA * inCoreB
@@ -53,7 +55,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
 
     val op = new OpAewB(A, B, '+')
 
-    val M = new CheckpointedDrmBase(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA + inCoreB
@@ -70,7 +72,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
 
     val op = new OpAewB(A, B, '-')
 
-    val M = new CheckpointedDrmBase(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA - inCoreB
@@ -87,7 +89,7 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
 
     val op = new OpAewB(A, B, '/')
 
-    val M = new CheckpointedDrmBase(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA / inCoreB

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
index 34bb88a..8734b70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
@@ -21,10 +21,10 @@ import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
+import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
 import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpAtA
 
 /** Tests for {@link XtX} */
 class AtASuite extends FunSuite with MahoutLocalContext {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
index 48edbe9..a53501d 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
@@ -21,20 +21,19 @@ import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
-import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.OpAt
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 
 /** Tests for A' algorithms */
 class AtSuite extends FunSuite with MahoutLocalContext {
 
   test("At") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m=inCoreA, numPartitions = 2)
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
 
     val op = new OpAt(A)
-    val AtDrm = new CheckpointedDrmBase(rdd= At.at(op,srcA=A),_nrow=op.nrow,_ncol=op.ncol)
+    val AtDrm = new CheckpointedDrmSpark(rdd = At.at(op, srcA = A), _nrow = op.nrow, _ncol = op.ncol)
     val inCoreAt = AtDrm.collect
     val inCoreControlAt = inCoreA.t
 
@@ -42,6 +41,5 @@ class AtSuite extends FunSuite with MahoutLocalContext {
     assert((inCoreAt - inCoreControlAt).norm < 1E-5)
 
 
-
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
index e4f2d55..2369441 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.drm.decompositions
 
 import org.scalatest.{Matchers, FunSuite}
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
 import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
 import RLikeDrmOps._
-import org.apache.mahout.math.{Matrices, SparseRowMatrix}
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.common.RandomUtils
 
 class MathSuite extends FunSuite with Matchers with MahoutLocalContext {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
index b4b6d98..6c71e11 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
@@ -18,10 +18,12 @@ package org.apache.mahout.sparkbindings.drm
  */
 
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
 import RLikeOps._
 import RLikeDrmOps._
-import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 
 /** Tests for DrmLikeOps */

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index 5feb731..caccb70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -18,11 +18,11 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.scalatest.FunSuite
-import org.apache.log4j.{Level, Logger, BasicConfigurator}
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
 import RLikeOps._
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.spark.SparkContext._
 
 
 /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index da38174..ac46a9e 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -18,16 +18,18 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.scalatest.{Matchers, FunSuite}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.math._
+import scalabindings._
+import drm._
+import RLikeOps._
 import RLikeDrmOps._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAtB, OpAtA, CheckpointAction}
-import org.apache.spark.SparkContext
+import org.apache.mahout.sparkbindings._
+import test.MahoutLocalContext
 import scala.collection.mutable.ArrayBuffer
 import org.apache.mahout.math.Matrices
-import org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.sparkbindings.{SparkEngine, blas}
 import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
 
 /** R-like DRM DSL operation tests */
 class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -150,7 +152,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val C = A.t %*% B
 
-    CheckpointAction.optimize(C) should equal (OpAtB[Int](A,B))
+    SparkEngine.optimizerRewrite(C) should equal (OpAtB[Int](A,B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% inCoreB
@@ -176,7 +178,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val C = A.t %*% B
 
-    CheckpointAction.optimize(C) should equal (OpAtB[String](A,B))
+    SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% inCoreB
@@ -198,7 +200,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val C = A.t %*% B
 
-    CheckpointAction.optimize(C) should equal (OpAtB[String](A,B))
+    SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
@@ -246,7 +248,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val AtA = A.t %*% A
 
     // Assert optimizer detects square
-    CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
 
     val inCoreAtA = AtA.collect
     val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -264,7 +266,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val AtA = A.t %*% A
 
     // Assert optimizer detects square
-    CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
 
     val inCoreAtA = AtA.collect
     val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -284,7 +286,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val AtA = A.t %*% A
 
     // Assert optimizer detects square
-    CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+    SparkEngine.optimizerRewrite(action = AtA) should equal(OpAtA(A))
 
     val inCoreAtA = AtA.collect
     val inCoreAtAControl = inCoreA.t %*% inCoreA
@@ -371,7 +373,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
   }
 
   test ("general side")  {
-    val sc = implicitly[SparkContext]
+    val sc = implicitly[DistributedContext]
     val k1 = sc.parallelize(Seq(ArrayBuffer(0,1,2,3)))
 //      .persist(StorageLevel.MEMORY_ONLY)   // -- this will demonstrate immutability side effect!
       .persist(StorageLevel.MEMORY_ONLY_SER)
@@ -405,7 +407,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val drmA = drmParallelize(inCoreA, numPartitions = 2)
 
-    CheckpointAction.optimize(drmA.t %*% x) should equal (OpAtx(drmA, x))
+    SparkEngine.optimizerRewrite(drmA.t %*% x) should equal (OpAtx(drmA, x))
 
     val atx = (drmA.t %*% x).collect(::, 0)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index ff7098b..2d9ed76 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -1,14 +1,15 @@
 package org.apache.mahout.sparkbindings.test
 
 import org.scalatest.Suite
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.drm.DistributedContext
 
 trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
   this: Suite =>
 
-  protected implicit var mahoutCtx: SparkContext = _
+  protected implicit var mahoutCtx: DistributedContext = _
 
   override protected def beforeEach() {
     super.beforeEach()
@@ -26,7 +27,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
   override protected def afterEach() {
     if (mahoutCtx != null) {
       try {
-        mahoutCtx.stop()
+        mahoutCtx.close()
       } finally {
         mahoutCtx = null
       }


[2/3] MAHOUT-1529 closes PR #1

Posted by dl...@apache.org.
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
deleted file mode 100644
index 8e05a83..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.scalabindings
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import RLikeOps._
-import org.apache.mahout.common.RandomUtils
-import scala._
-import org.apache.log4j.Logger
-
-private[math] object SSVD {
-
-  private val log = Logger.getLogger(SSVD.getClass)
-
-  /**
-   * In-core SSVD algorithm.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    var y = a %*% omega
-    var yty = y.t %*% y
-    val at = a.t
-    var ch = chol(yty)
-    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-    var bt = ch.solveRight(at %*% y)
-
-    // Power iterations
-    for (i <- 0 until q) {
-      y = a %*% bt
-      yty = y.t %*% y
-      ch = chol(yty)
-      bt = ch.solveRight(at %*% y)
-    }
-
-    val bbt = bt.t %*% bt
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = ch.solveRight(y) %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-  }
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    // Dataset mean
-    val xi = a.colMeans()
-
-    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
-    var y = a %*% omega
-
-    // Fixing y
-    val s_o = omega.t %*% xi
-    y := ((r,c,v) => v - s_o(c))
-
-    var yty = y.t %*% y
-    var ch = chol(yty)
-//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
-    // This is implicit Q of QR(Y)
-    var qm = ch.solveRight(y)
-    var bt = a.t %*% qm
-    var s_q = qm.colSums()
-    var s_b = bt.t %*% xi
-
-    // Power iterations
-    for (i <- 0 until q) {
-
-      // Fix bt
-      bt -= xi cross s_q
-
-      y = a %*% bt
-
-      // Fix Y again.
-      y := ((r,c,v) => v - s_b(c))
-
-      yty = y.t %*% y
-      ch = chol(yty)
-      qm = ch.solveRight(y)
-      bt = a.t %*% qm
-      s_q = qm.colSums()
-      s_b = bt.t %*% xi
-    }
-
-    val c = s_q cross s_b
-
-    // BB' computation becomes
-    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
-
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = qm %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+  private val log = Logger.getLogger(SSVD.getClass)
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    var y = a %*% omega
+    var yty = y.t %*% y
+    val at = a.t
+    var ch = chol(yty)
+    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+    var bt = ch.solveRight(at %*% y)
+
+    // Power iterations
+    for (i <- 0 until q) {
+      y = a %*% bt
+      yty = y.t %*% y
+      ch = chol(yty)
+      bt = ch.solveRight(at %*% y)
+    }
+
+    val bbt = bt.t %*% bt
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = ch.solveRight(y) %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+  }
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    // Dataset mean
+    val xi = a.colMeans()
+
+    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+    var y = a %*% omega
+
+    // Fixing y
+    val s_o = omega.t %*% xi
+    y := ((r,c,v) => v - s_o(c))
+
+    var yty = y.t %*% y
+    var ch = chol(yty)
+//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+    // This is implicit Q of QR(Y)
+    var qm = ch.solveRight(y)
+    var bt = a.t %*% qm
+    var s_q = qm.colSums()
+    var s_b = bt.t %*% xi
+
+    // Power iterations
+    for (i <- 0 until q) {
+
+      // Fix bt
+      bt -= xi cross s_q
+
+      y = a %*% bt
+
+      // Fix Y again.
+      y := ((r,c,v) => v - s_b(c))
+
+      yty = y.t %*% y
+      ch = chol(yty)
+      qm = ch.solveRight(y)
+      bt = a.t %*% qm
+      s_q = qm.colSums()
+      s_b = bt.t %*% xi
+    }
+
+    val c = s_q cross s_b
+
+    // BB' computation becomes
+    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = qm %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index c9e59ba..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.math
 
 import org.apache.mahout.math._
 import org.apache.mahout.math.solver.EigenDecomposition
+import org.apache.mahout.math.decompositions.SSVD
 
 /**
  * Mahout matrices and vectors' scala syntactic sugar

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index a97b453..020a2f9 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.sparkbindings.shell
 
 import org.apache.spark.repl.SparkILoop
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.mahout.sparkbindings._
 import scala.tools.nsc.Properties
 import scala.Some
+import org.apache.mahout.sparkbindings._
 
 class MahoutSparkILoop extends SparkILoop {
 
   private val postInitScript =
-      "import org.apache.mahout.math.Vector" ::
-      "import org.apache.mahout.math.scalabindings._" ::
-      "import RLikeOps._" ::
-      "import org.apache.mahout.sparkbindings._" ::
-      "import drm._" ::
-      "import RLikeDrmOps._" ::
-      "org.apache.spark.storage.StorageLevel" ::
-      "implicit val _sc = sc" ::
-      Nil
+    "import org.apache.mahout.math._" ::
+        "import scalabindings._" ::
+        "import RLikeOps._" ::
+        "import drm._" ::
+        "import RLikeDrmOps._" ::
+        "import org.apache.mahout.sparkbindings._" ::
+        Nil
 
   override protected def postInitialization() {
     super.postInitialization()
@@ -50,10 +48,25 @@ class MahoutSparkILoop extends SparkILoop {
       customJars = jars,
       sparkConf = conf
     )
+
     echo("Created spark context..")
     sparkContext
   }
 
+  override def initializeSpark() {
+    intp.beQuietDuring {
+      command("""
+
+         @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext =
+            new org.apache.mahout.sparkbindings.SparkDistributedContext(
+            org.apache.spark.repl.Main.interp.createSparkContext())
+
+              """)
+      command("import org.apache.spark.SparkContext._")
+      echo("Mahout distributed context is available as \"implicit val sdc\".")
+    }
+  }
+
   override def prompt: String = "mahout> "
 
   override def printWelcome(): Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
index 648f07f..9c0a51f 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.mahout.sparkbindings.shell
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/test/mahout/simple.mscala
----------------------------------------------------------------------
diff --git a/spark-shell/src/test/mahout/simple.mscala b/spark-shell/src/test/mahout/simple.mscala
index 385e4e8..854c482 100644
--- a/spark-shell/src/test/mahout/simple.mscala
+++ b/spark-shell/src/test/mahout/simple.mscala
@@ -1,8 +1,25 @@
-import org.apache.mahout.sparkbindings._
-import drm._
-import RLikeDrmOps._
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import org.apache.spark.storage.StorageLevel
+/*
+ To run, execute from mahout shell:
+
+ :load spark-shell/src/test/mahout/simple.mscala
+*/
 
 val a = dense((1,2,3),(3,4,5))
 val drmA = drmParallelize(a,numPartitions = 2)
@@ -19,5 +36,5 @@ r.collect
 // local write
 r.writeDRM("file:///home/dmitriy/A")
 
-// hdfs write
-r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
+// hdfs write -- uncomment to test
+// r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 8b89969..ac99ffd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -220,53 +220,52 @@
         </property>
       </activation>
       <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-client</artifactId>
-        <version>${hadoop2.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.mahout</groupId>
-        <artifactId>mahout-mrlegacy</artifactId>
-        <exclusions>
-          <exclusion>
-            <groupId>asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-core</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-common</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-core</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-common</artifactId>
-      </dependency>
-    </dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop2.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.mahout</groupId>
+          <artifactId>mahout-mrlegacy</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-core</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </dependency>
+      </dependencies>
     </profile>
   </profiles>
 
   <dependencies>
-    <!-- spark stuff -->
+
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.major}</artifactId>
-      <version>${spark.version}</version>
+      <groupId>org.apache.mahout</groupId>
+      <artifactId>mahout-math-scala</artifactId>
     </dependency>
 
     <dependency>
@@ -283,18 +282,18 @@
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-math-scala</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala</artifactId>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
 
 
     <!--  3rd-party -->
-
+    <!-- spark stuff -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.major}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
 
     <!-- scala stuff -->
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
new file mode 100644
index 0000000..4d13a5a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext}
+import org.apache.spark.SparkContext
+
+class SparkDistributedContext(val sc: SparkContext) extends DistributedContext {
+
+  val engine: DistributedEngine = SparkEngine
+
+  def close() {
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
new file mode 100644
index 0000000..0c904ab
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.math._
+import scala.reflect.ClassTag
+import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings.blas._
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import scala.Some
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext
+
+/** Spark-specific non-drm-method operations */
+object SparkEngine extends DistributedEngine {
+
+  def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+    val n = drm.ncol
+
+    drm.rdd
+        // Throw away keys
+        .map(_._2)
+        // Fold() doesn't work with kryo still. So work around it.
+        .mapPartitions(iter => {
+      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+      Iterator(acc)
+    })
+        // Since we preallocated new accumulator vector per partition, this must not cause any side
+        // effects now.
+        .reduce(_ += _)
+  }
+
+  /** Engine-specific colMeans implementation based on a checkpoint. */
+  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+  /**
+   * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+   *
+   * A particular physical engine implementation may choose to either use or not use these rewrites
+   * as a useful basic rewriting rule.<P>
+   */
+  override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
+
+  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+    // Spark-specific Physical Plan translation.
+    val rdd = tr2phys(plan)
+
+    val newcp = new CheckpointedDrmSpark(
+      rdd = rdd,
+      _nrow = plan.nrow,
+      _ncol = plan.ncol,
+      _cacheStorageLevel = cacheHint2Spark(ch),
+      partitioningTag = plan.partitioningTag
+    )
+    newcp.cache()
+  }
+
+  /** Broadcast support */
+  def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = dc.broadcast(v)
+
+  /** Broadcast support */
+  def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
+
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format)
+   *
+   * @param path
+   * @param sc spark context (wanted to make that implicit, doesn't work in current version of
+   *           scala with the type bounds, sorry)
+   *
+   * @return DRM[Any] where Any is automatically translated to value type
+   */
+  def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+    implicit val scc:SparkContext = sc
+    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
+
+    val key = rdd.map(_._1).take(1)(0)
+    val keyWClass = key.getClass.asSubclass(classOf[Writable])
+
+    val key2val = key match {
+      case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
+      case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
+      case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
+      case xx: Writable => (v: AnyRef) => v
+    }
+
+    val val2key = key match {
+      case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
+      case xx: Text => (x: Any) => new Text(x.toString)
+      case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
+      case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
+    }
+
+    val  km = key match {
+      case xx: IntWritable => implicitly[ClassTag[Int]]
+      case xx: Text => implicitly[ClassTag[String]]
+      case xx: LongWritable => implicitly[ClassTag[Long]]
+      case xx: Writable => ClassTag(classOf[Writable])
+    }
+
+    {
+      implicit def getWritable(x: Any): Writable = val2key()
+      new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+    }
+  }
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext)
+  : CheckpointedDrm[Int] = {
+    new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+  }
+
+  private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): DrmRdd[Int] = {
+
+    val p = (0 until m.nrow).map(i => i -> m(i, ::))
+    sc.parallelize(p, numPartitions)
+
+  }
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext)
+  : CheckpointedDrm[String] = {
+
+    val rb = m.getRowLabelBindings
+    val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+
+    new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+  }
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+      val partNRow = (nrow - 1) / numPartitions + 1
+      val partStart = partNRow * part
+      val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    })
+    new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
+  }
+
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+      val partNRow = (nrow - 1) / numPartitions + 1
+      val partStart = partNRow * part
+      val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    })
+    new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
+  }
+
+  private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
+    case CacheHint.NONE => StorageLevel.NONE
+    case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
+    case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
+    case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
+    case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
+    case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
+    case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
+    case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
+    case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
+    case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+  }
+
+  /** Translate previously optimized physical plan */
+  private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+    // I do explicit evidence propagation here since matching via case classes seems to be loosing
+    // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
+    // Hence you see explicit evidence attached to all recursive exec() calls.
+    oper match {
+      // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
+      // the A'A case but actual transposition intent which should be removed from consideration
+      // (we cannot do actual flip for non-int-keyed arguments)
+      case OpAtAnyKey(_) =>
+        throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+      case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
+      case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
+        zippable = a.partitioningTag == b.partitioningTag)
+      case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
+      case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+      case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+      // Custom operators, we just execute them
+      case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+        src = tr2phys(blockOp.A)(blockOp.classTagA),
+        ncol = blockOp.ncol,
+        bmf = blockOp.bmf
+      )
+      case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
+      case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+          .format(oper))
+
+    }
+  }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 8d19068..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.blas
 
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
 import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.sparkbindings._
+import drm._
 import org.apache.mahout.math.{Matrix, SparseRowMatrix}
 import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
 
 /** Contains RDD plans for ABt operator */
 object ABt {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index ec93cf7..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,13 +17,13 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.{OpAewScalar, OpAewB}
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
 
 /** Elementwise drm-drm operators */
 object AewB {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 0383fe1..c923e62 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -1,13 +1,14 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
 import RLikeOps._
-
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
 import org.apache.mahout.math.DiagonalMatrix
+import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 
 /** Matrix product with one of operands an in-core matrix */
 object AinCoreB {
@@ -21,8 +22,8 @@ object AinCoreB {
 
   private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
-    val dg = drmBroadcast(x = op.right.viewDiagonal())
+    implicit val ctx:DistributedContext = rddA.context
+    val dg = drmBroadcast(op.right.viewDiagonal())
 
     val rdd = rddA
         // Just multiply the blocks
@@ -35,7 +36,7 @@ object AinCoreB {
   private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val sc:DistributedContext = rddA.sparkContext
 
     val bcastB = drmBroadcast(m = op.right)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 38af173..56de9f4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,12 +17,12 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
+import org.apache.mahout.math.drm.logical.OpAt
 
 /** A' algorithms */
 object At {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index 17cab62..450e836 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -18,14 +18,16 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import collection._
 import JavaConversions._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
 import org.apache.spark.SparkContext._
 import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtA
+import SparkEngine._
 
 /**
  * Collection of algorithms to compute X' times X

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index f0c3423..86aadc8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -18,13 +18,14 @@
 package org.apache.mahout.sparkbindings.blas
 
 import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings.drm._
 import org.apache.spark.rdd.RDD
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.SparkContext._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtB}
 import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtB
 
 object AtB {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index e6de443..94c3f06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -1,12 +1,13 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
 import RLikeOps._
-
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAx, OpTimesRightMatrix}
+import drm._
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
 
 
 /** Matrix product with one of operands an in-core matrix */
@@ -15,9 +16,9 @@ object Ax {
   def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val sc:DistributedContext = rddA.sparkContext
 
-    val bcastX = drmBroadcast(x = op.x)
+    val bcastX = drmBroadcast(op.x)
 
     val rdd = rddA
         // Just multiply the blocks
@@ -30,9 +31,9 @@ object Ax {
 
   def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val dc:DistributedContext = rddA.sparkContext
 
-    val bcastX = drmBroadcast(x = op.x)
+    val bcastX = drmBroadcast(op.x)
 
     val inCoreM = rddA
         // Just multiply the blocks
@@ -51,7 +52,7 @@ object Ax {
     // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
     // it back as drm blockified rdd
 
-    val rdd = sc.parallelize(Seq(inCoreM), numSlices = 1)
+    val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
         .map(block => Array.tabulate(block.nrow)(i => i) -> block)
 
     new DrmRddInput(blockifiedSrc = Some(rdd))

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 6bb7b4b..a3caac7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -17,11 +17,11 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.DrmRdd
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
+import org.apache.mahout.sparkbindings.DrmRdd
 
 class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
new file mode 100644
index 0000000..4c68c9a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import scala.reflect.ClassTag
+
+object MapBlock {
+
+  def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+
+    // We can't use attributes to avoid putting the whole this into closure.
+
+    val rdd = src.toBlockifiedDrmRdd()
+        .map(blockTuple => {
+      val out = bmf(blockTuple)
+
+      assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
+      assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
+
+      out
+    })
+    new DrmRddInput(blockifiedSrc = Some(rdd))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 5affd3b..d0a50b5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -1,7 +1,7 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.OpRowRange
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRowRange
 
 object Slicing {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 795f2e2..d2d5340 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,7 +17,6 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.sparkbindings.drm.DrmRdd
 import scala.reflect.ClassTag
 
 /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
deleted file mode 100644
index 89d3735..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
-  private val log = Logger.getLogger(DQR.getClass)
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
-    if (A.ncol > 5000)
-      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
-    val AtA = (A.t %*% A).checkpoint()
-    val inCoreAtA = AtA.collect
-    implicit val sc = AtA.rdd.sparkContext
-
-    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
-    val ch = chol(inCoreAtA)
-    val inCoreR = (ch.getL cloned) t
-
-    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
-    if (checkRankDeficiency && !ch.isPositiveDefinite)
-      throw new IllegalArgumentException("R is rank-deficient.")
-
-    val bcastAtA = sc.broadcast(inCoreAtA)
-
-    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
-    // decompose A'A in the backend again.
-
-    // Compute Q = A*inv(L') -- we can do it blockwise.
-    val Q = A.mapBlock() {
-      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
-    }
-
-    Q -> inCoreR
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
deleted file mode 100644
index f3b0e3f..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-    implicit val sc = drmA.rdd.sparkContext
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // Dataset mean
-    val xi = drmA.colMeans
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
-    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
-    // memory beyond that is required to keep xi around, it still might be parallelized to backs
-    // for significantly big n and r. TODO
-    val s_o = omega.t %*% xi
-
-    val bcastS_o = drmBroadcast(s_o)
-    val bcastXi = drmBroadcast(xi)
-
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val s_o:Vector = bcastS_o
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
-        keys -> blockY
-    }
-        // Checkpoint Y
-        .checkpoint()
-
-    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-    var s_q = drmQ.colSums()
-    var bcastVarS_q = drmBroadcast(s_q)
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = (drmA.t %*% drmQ).checkpoint()
-
-    var s_b = (drmBt.t %*% xi).collect(::, 0)
-    var bcastVarS_b = drmBroadcast(s_b)
-
-    for (i <- 0 until q) {
-
-      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
-      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
-      // create readonly closure attributes in this very scope.
-      val bcastS_q = bcastVarS_q
-      val bcastS_b = bcastVarS_b
-      val bcastXib = bcastXi
-
-      // Fix Bt as B' -= xi cross s_q
-      drmBt = drmBt.mapBlock() {
-        case (keys, block) =>
-          val s_q: Vector = bcastS_q
-          val xi: Vector = bcastXib
-          keys.zipWithIndex.foreach {
-            case (key, idx) => block(idx, ::) -= s_q * xi(key)
-          }
-          keys -> block
-      }
-
-      drmY.uncache()
-      drmQ.uncache()
-
-      drmY = (drmA %*% drmBt)
-          // Fix Y by subtracting s_b from each row of the AB'
-          .mapBlock() {
-        case (keys, block) =>
-          val s_b: Vector = bcastS_b
-          for (row <- 0 until block.nrow) block(row, ::) -= s_b
-          keys -> block
-      }
-          // Checkpoint Y
-          .checkpoint()
-
-      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-      s_q = drmQ.colSums()
-      bcastVarS_q = drmBroadcast(s_q)
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = (drmA.t %*% drmQ).checkpoint()
-
-      s_b = (drmBt.t %*% xi).collect(::, 0)
-      bcastVarS_b = drmBroadcast(s_b)
-    }
-
-    val c = s_q cross s_b
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
-        c - c.t + (s_q cross s_q) * (xi dot xi)
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
deleted file mode 100644
index de15d2b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
-  /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-
-    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
-    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
-    // is much more compact.
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        keys -> blockY
-    }
-
-    var drmQ = dqrThin(drmY.checkpoint())._1
-    // Checkpoint Q if last iteration
-    if (q == 0) drmQ = drmQ.checkpoint()
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = drmA.t %*% drmQ
-    // Checkpoint B' if last iteration
-    if (q == 0) drmBt = drmBt.checkpoint()
-
-    for (i <- 0  until q) {
-      drmY = drmA %*% drmBt
-      drmQ = dqrThin(drmY.checkpoint())._1
-      // Checkpoint Q if last iteration
-      if (i == q - 1) drmQ = drmQ.checkpoint()
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = drmA.t %*% drmQ
-      // Checkpoint B' if last iteration
-      if (i == q - 1) drmBt = drmBt.checkpoint()
-    }
-
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
deleted file mode 100644
index faa89ef..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.mahout.sparkbindings.drm
-
-
-object CacheHint extends Enumeration {
-
-  type CacheHint = Value
-
-  val NONE,
-  DISK_ONLY,
-  DISK_ONLY_2,
-  MEMORY_ONLY,
-  MEMORY_ONLY_2,
-  MEMORY_ONLY_SER,
-  MEMORY_ONLY_SER_2,
-  MEMORY_AND_DISK,
-  MEMORY_AND_DISK_2,
-  MEMORY_AND_DISK_SER,
-  MEMORY_AND_DISK_SER_2 = Value
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
deleted file mode 100644
index 0007477..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.Matrix
-import org.apache.hadoop.io.Writable
-
-/**
- * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
- * therefore collected or saved.
- * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
- */
-trait CheckpointedDrm[K] extends DrmLike[K] {
-
-  def rdd: DrmRdd[K]
-
-  def collect: Matrix
-
-  def writeDRM(path: String)
-
-  /** If this checkpoint is already declared cached, uncache. */
-  def uncache()
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
deleted file mode 100644
index 8216881..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.{SparseMatrix, DenseMatrix, Matrix, Vector}
-import math._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.SparkContext._
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
-
-class CheckpointedDrmBase[K: ClassTag](
-    val rdd: DrmRdd[K],
-    private var _nrow: Long = -1L,
-    private var _ncol: Int = -1,
-    private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
-    private[sparkbindings] val partitioningTag: Long = Random.nextLong()
-
-    ) extends CheckpointedDrm[K] {
-
-
-  lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
-  lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
-
-  private var cached: Boolean = false
-
-
-  /**
-   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
-   * and writing down Spark graph lineage since last checkpointed DRM.
-   */
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] =
-  // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
-    this
-
-  def cache() = {
-    if (!cached) {
-      rdd.persist(_cacheStorageLevel)
-      cached = true
-    }
-    this
-  }
-
-
-  /**
-   * if matrix was previously persisted into cache,
-   * delete cached representation
-   */
-  def uncache() = {
-    if (cached) {
-      rdd.unpersist(blocking = false)
-      cached = false
-    }
-    this
-  }
-
-  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmBase[K] =
-    new CheckpointedDrmBase[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
-
-
-  /**
-   * Collecting DRM to fron-end in-core Matrix.
-   *
-   * If key in DRM is Int, then matrix is collected using key as row index.
-   * Otherwise, order of rows in result is undefined but key.toString is applied
-   * as rowLabelBindings of the in-core matrix .
-   *
-   * Note that this pre-allocates target matrix and then assigns collected RDD to it
-   * thus this likely would require about 2 times the RDD memory
-   * @return
-   */
-  def collect: Matrix = {
-
-    val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
-
-    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
-    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
-
-    // since currently spark #collect() requires Serializeable support,
-    // we serialize DRM vectors into byte arrays on backend and restore Vector
-    // instances on the front end:
-    val data = rdd.map(t => (t._1, t._2)).collect()
-
-
-    val m = if (data.forall(_._2.isDense))
-      new DenseMatrix(rows, cols)
-
-    else
-      new SparseMatrix(rows, cols)
-
-    if (intRowIndices)
-      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
-    else {
-
-      // assign all rows sequentially
-      val d = data.zipWithIndex
-      d.foreach(t => m(t._2, ::) := t._1._2)
-
-      // row bindings
-      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
-
-      m.setRowLabelBindings(rowBindings)
-    }
-
-    m
-  }
-
-
-  /**
-   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
-   * @param path
-   */
-  def writeDRM(path: String) = {
-    val ktag = implicitly[ClassTag[K]]
-
-    implicit val k2wFunc: (K) => Writable =
-      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
-      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
-      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
-      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
-      else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
-//    implicit def any2w(k: Any): Writable = k2wFunc(k)
-    rdd.saveAsSequenceFile(path)
-  }
-
-  protected def computeNRow = {
-
-    val intRowIndex = classTag[K] == classTag[Int]
-
-    if (intRowIndex)
-      cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
-    else
-      cache().rdd.count()
-  }
-
-  protected def computeNCol =
-    cache().rdd.map(_._2.length).fold(-1)(max(_, _))
-
-  protected def computeNNonZero =
-    cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
new file mode 100644
index 0000000..2d80fe3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math._
+import math._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import scala.collection.JavaConversions._
+import org.apache.spark.storage.StorageLevel
+import reflect._
+import scala.util.Random
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.spark.SparkContext._
+
+/** Spark-specific optimizer-checkpointed DRM. */
+class CheckpointedDrmSpark[K: ClassTag](
+    val rdd: DrmRdd[K],
+    private var _nrow: Long = -1L,
+    private var _ncol: Int = -1,
+    private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+    override protected[mahout] val partitioningTag: Long = Random.nextLong()
+    ) extends CheckpointedDrm[K] {
+
+  lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
+  lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
+
+  private var cached: Boolean = false
+  override protected[mahout] val context: DistributedContext = rdd.context
+
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+    // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
+    this
+  }
+
+  def cache() = {
+    if (!cached) {
+      rdd.persist(_cacheStorageLevel)
+      cached = true
+    }
+    this
+  }
+
+
+  /**
+   * if matrix was previously persisted into cache,
+   * delete cached representation
+   */
+  def uncache() = {
+    if (cached) {
+      rdd.unpersist(blocking = false)
+      cached = false
+    }
+    this
+  }
+
+//  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
+//    new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
+
+
+  /**
+   * Collecting DRM to fron-end in-core Matrix.
+   *
+   * If key in DRM is Int, then matrix is collected using key as row index.
+   * Otherwise, order of rows in result is undefined but key.toString is applied
+   * as rowLabelBindings of the in-core matrix .
+   *
+   * Note that this pre-allocates target matrix and then assigns collected RDD to it
+   * thus this likely would require about 2 times the RDD memory
+   * @return
+   */
+  def collect: Matrix = {
+
+    val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
+
+    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
+
+    // since currently spark #collect() requires Serializeable support,
+    // we serialize DRM vectors into byte arrays on backend and restore Vector
+    // instances on the front end:
+    val data = rdd.map(t => (t._1, t._2)).collect()
+
+
+    val m = if (data.forall(_._2.isDense))
+      new DenseMatrix(rows, cols)
+
+    else
+      new SparseMatrix(rows, cols)
+
+    if (intRowIndices)
+      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
+    else {
+
+      // assign all rows sequentially
+      val d = data.zipWithIndex
+      d.foreach(t => m(t._2, ::) := t._1._2)
+
+      // row bindings
+      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
+
+      m.setRowLabelBindings(rowBindings)
+    }
+
+    m
+  }
+
+
+  /**
+   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
+   * @param path
+   */
+  def writeDRM(path: String) = {
+    val ktag = implicitly[ClassTag[K]]
+
+    implicit val k2wFunc: (K) => Writable =
+      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
+      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
+      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
+      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
+      else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
+
+    rdd.saveAsSequenceFile(path)
+  }
+
+  protected def computeNRow = {
+
+    val intRowIndex = classTag[K] == classTag[Int]
+
+    if (intRowIndex)
+      cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+    else
+      cache().rdd.count()
+  }
+
+  protected def computeNCol =
+    cache().rdd.map(_._2.length).fold(-1)(max(_, _))
+
+  protected def computeNNonZero =
+    cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
new file mode 100644
index 0000000..7cf6bd6
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -0,0 +1,16 @@
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+import scala.reflect.ClassTag
+
+/** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */
+class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+
+  assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
+
+  private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
+
+  /** Spark matrix customization exposure */
+  def rdd = sparkDrm.rdd
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
deleted file mode 100644
index f891c1e..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{DenseVector, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-
-
-/**
- * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
- * the DRMBase once they stabilize.
- *
- */
-class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
-
-  /**
-   * Reorganize every partition into a single in-core matrix
-   * @return
-   */
-  def blockify(): BlockifiedDrmRdd[K] =
-    org.apache.mahout.sparkbindings.drm.blockify(rdd = drm.rdd, blockncol = drm.ncol)
-
-  /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
-  def colSums(): Vector = {
-    val n = drm.ncol
-
-    drm.rdd
-        // Throw away keys
-        .map(_._2)
-        // Fold() doesn't work with kryo still. So work around it.
-        .mapPartitions(iter => {
-      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
-      Iterator(acc)
-    })
-        // Since we preallocated new accumulator vector per partition, this must not cause any side
-        // effects now.
-        .reduce(_ += _)
-
-  }
-
-  def colMeans(): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
deleted file mode 100644
index de1f9bd..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.spark.storage.StorageLevel
-
-/**
- *
- * Basic spark DRM trait.
- *
- * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
- * this package. Spark backing is already implied.
- *
- */
-trait DrmLike[K] {
-
-  private[sparkbindings] def partitioningTag:Long
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long
-
-  /** R-like syntax for number of columns */
-  def ncol: Int
-
-  /**
-   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
-   * and writing down Spark graph lineage since last checkpointed DRM.
-   */
-  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
deleted file mode 100644
index ce7b867..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io.Writable
-import org.apache.mahout.sparkbindings.drm.plan.{OpRowRange, OpMapBlock}
-import RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-/** Common Drm ops */
-class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
-
-  /**
-   * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
-   * matrices; or they could be completely new matrices with new keyset. In the latter case, output
-   * matrix width must be specified with <code>ncol</code> parameter.<P>
-   *
-   * New block heights must be of the same height as the original geometry.<P>
-   *
-   * @param ncol new matrix' width (only needed if width changes).
-   * @param bmf
-   * @tparam R
-   * @return
-   */
-  def mapBlock[R : ClassTag](ncol: Int = -1)
-      (bmf: BlockMapFunc[K, R]): DrmLike[R] =
-    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
-
-
-  /**
-   * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
-   *
-   * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
-   *
-   * Row range is currently unsupported except for the all-range. When it will be fully supported,
-   * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
-   *
-   * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
-   * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
-   */
-  def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
-
-
-    val rowSrc: DrmLike[K] = if (rowRange != ::) {
-
-      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
-
-        assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
-        val intKeyed = drm.asInstanceOf[DrmLike[Int]]
-
-        new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
-
-      } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
-
-    } else drm
-
-    if (colRange != ::) {
-
-      assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
-
-      // Use mapBlock operator to do in-core subranging.
-      rowSrc.mapBlock(ncol = colRange.length)({
-        case (keys, block) => keys -> block(::, colRange)
-      })
-
-    } else rowSrc
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index 47cfa26..3801c77 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -20,6 +20,7 @@ package org.apache.mahout.sparkbindings.drm
 import scala.reflect.ClassTag
 import org.apache.spark.SparkContext
 import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings._
 
 /** Encapsulates either DrmRdd[K] or BlockifiedDrmRdd[K] */
 class DrmRddInput[K: ClassTag](


[3/3] git commit: MAHOUT-1529 closes PR #1

Posted by dl...@apache.org.
MAHOUT-1529 closes PR #1

Squashed commit of the following:

commit e7b27280333fe12c94f3f5675c876f56e9e60728
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Tue May 27 11:01:13 2014 -0700

    License, comments

commit bfca581389d91fb9c41db8c49e256ff694335fd1
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 23 16:07:01 2014 -0700

    Fixing  shell

commit c3b8aa4bf7d8939aa0800ce12b7f63e01f9686b9
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 23 13:36:20 2014 -0700

    - MahoutContext

commit a6461ac22a46793ab0bcdfcafe827d808f9e1810
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Fri May 23 12:13:40 2014 -0700

    rebasing changes in MAHOUT-1529 branch on to new git master (applied thru selective patching, may, but hopefully, doesn't revert any commits in between in spark/ and math-scala/ modules


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8714a0f7
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8714a0f7
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8714a0f7

Branch: refs/heads/master
Commit: 8714a0f722663ea5cb16c14c5b8a01e57574cd93
Parents: 5a1c2cc
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Tue May 27 12:13:29 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Tue May 27 12:13:29 2014 -0700

----------------------------------------------------------------------
 math-scala/pom.xml                              |   1 -
 .../org/apache/mahout/math/drm/BCast.scala      |  23 ++
 .../org/apache/mahout/math/drm/CacheHint.scala  |  19 ++
 .../mahout/math/drm/CheckpointedDrm.scala       |  36 +++
 .../mahout/math/drm/CheckpointedOps.scala       |  39 +++
 .../mahout/math/drm/DistributedContext.scala    |  27 +++
 .../mahout/math/drm/DistributedEngine.scala     | 158 ++++++++++++
 .../org/apache/mahout/math/drm/DrmLike.scala    |  48 ++++
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |  84 +++++++
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  92 +++++++
 .../mahout/math/drm/decompositions/DQR.scala    |  57 +++++
 .../mahout/math/drm/decompositions/DSPCA.scala  | 153 ++++++++++++
 .../mahout/math/drm/decompositions/DSSVD.scala  |  83 +++++++
 .../math/drm/logical/AbstractBinaryOp.scala     |  37 +++
 .../math/drm/logical/AbstractUnaryOp.scala      |  36 +++
 .../math/drm/logical/CheckpointAction.scala     |  47 ++++
 .../apache/mahout/math/drm/logical/OpAB.scala   |  41 ++++
 .../mahout/math/drm/logical/OpABAnyKey.scala    |  41 ++++
 .../apache/mahout/math/drm/logical/OpABt.scala  |  42 ++++
 .../apache/mahout/math/drm/logical/OpAewB.scala |  39 +++
 .../mahout/math/drm/logical/OpAewScalar.scala   |  38 +++
 .../apache/mahout/math/drm/logical/OpAt.scala   |  33 +++
 .../apache/mahout/math/drm/logical/OpAtA.scala  |  36 +++
 .../mahout/math/drm/logical/OpAtAnyKey.scala    |  34 +++
 .../apache/mahout/math/drm/logical/OpAtB.scala  |  42 ++++
 .../apache/mahout/math/drm/logical/OpAtx.scala  |  41 ++++
 .../apache/mahout/math/drm/logical/OpAx.scala   |  42 ++++
 .../mahout/math/drm/logical/OpMapBlock.scala    |  41 ++++
 .../mahout/math/drm/logical/OpRowRange.scala    |  36 +++
 .../math/drm/logical/OpTimesLeftMatrix.scala    |  43 ++++
 .../math/drm/logical/OpTimesRightMatrix.scala   |  46 ++++
 .../org/apache/mahout/math/drm/package.scala    | 125 ++++++++++
 .../apache/mahout/math/scalabindings/SSVD.scala | 165 -------------
 .../scalabindings/decompositions/SSVD.scala     | 165 +++++++++++++
 .../mahout/math/scalabindings/package.scala     |   1 +
 .../sparkbindings/shell/MahoutSparkILoop.scala  |  33 ++-
 .../mahout/sparkbindings/shell/Main.scala       |  17 ++
 spark-shell/src/test/mahout/simple.mscala       |  29 ++-
 spark/pom.xml                                   |  95 ++++----
 .../sparkbindings/SparkDistributedContext.scala |  30 +++
 .../mahout/sparkbindings/SparkEngine.scala      | 240 +++++++++++++++++++
 .../apache/mahout/sparkbindings/blas/ABt.scala  |   5 +-
 .../apache/mahout/sparkbindings/blas/AewB.scala |   2 +-
 .../mahout/sparkbindings/blas/AinCoreB.scala    |  15 +-
 .../apache/mahout/sparkbindings/blas/At.scala   |   2 +-
 .../apache/mahout/sparkbindings/blas/AtA.scala  |   4 +-
 .../apache/mahout/sparkbindings/blas/AtB.scala  |   3 +-
 .../apache/mahout/sparkbindings/blas/Ax.scala   |  19 +-
 .../mahout/sparkbindings/blas/DrmRddOps.scala   |   2 +-
 .../mahout/sparkbindings/blas/MapBlock.scala    |  43 ++++
 .../mahout/sparkbindings/blas/Slicing.scala     |   2 +-
 .../mahout/sparkbindings/blas/package.scala     |   1 -
 .../sparkbindings/decompositions/DQR.scala      |  56 -----
 .../sparkbindings/decompositions/DSPCA.scala    | 153 ------------
 .../sparkbindings/decompositions/DSSVD.scala    |  83 -------
 .../mahout/sparkbindings/drm/CacheHint.scala    |  20 --
 .../sparkbindings/drm/CheckpointedDrm.scala     |  39 ---
 .../sparkbindings/drm/CheckpointedDrmBase.scala | 161 -------------
 .../drm/CheckpointedDrmSpark.scala              | 164 +++++++++++++
 .../drm/CheckpointedDrmSparkOps.scala           |  16 ++
 .../sparkbindings/drm/CheckpointedOps.scala     |  63 -----
 .../mahout/sparkbindings/drm/DrmLike.scala      |  46 ----
 .../mahout/sparkbindings/drm/DrmLikeOps.scala   |  85 -------
 .../mahout/sparkbindings/drm/DrmRddInput.scala  |   1 +
 .../mahout/sparkbindings/drm/RLikeDrmOps.scala  |  98 --------
 .../mahout/sparkbindings/drm/SparkBCast.scala   |  25 ++
 .../mahout/sparkbindings/drm/package.scala      | 226 ++---------------
 .../drm/plan/AbstractBinaryOp.scala             |  37 ---
 .../drm/plan/AbstractUnaryOp.scala              |  34 ---
 .../drm/plan/CheckpointAction.scala             | 207 ----------------
 .../mahout/sparkbindings/drm/plan/OpAB.scala    |  43 ----
 .../sparkbindings/drm/plan/OpABAnyKey.scala     |  41 ----
 .../mahout/sparkbindings/drm/plan/OpABt.scala   |  43 ----
 .../mahout/sparkbindings/drm/plan/OpAewB.scala  |  39 ---
 .../sparkbindings/drm/plan/OpAewScalar.scala    |  38 ---
 .../mahout/sparkbindings/drm/plan/OpAt.scala    |  34 ---
 .../mahout/sparkbindings/drm/plan/OpAtA.scala   |  37 ---
 .../sparkbindings/drm/plan/OpAtAnyKey.scala     |  34 ---
 .../mahout/sparkbindings/drm/plan/OpAtB.scala   |  42 ----
 .../mahout/sparkbindings/drm/plan/OpAtx.scala   |  42 ----
 .../mahout/sparkbindings/drm/plan/OpAx.scala    |  42 ----
 .../sparkbindings/drm/plan/OpMapBlock.scala     |  58 -----
 .../sparkbindings/drm/plan/OpRowRange.scala     |  37 ---
 .../drm/plan/OpTimesLeftMatrix.scala            |  44 ----
 .../drm/plan/OpTimesRightMatrix.scala           |  46 ----
 .../mahout/sparkbindings/drm/plan/package.scala |  24 --
 .../io/MahoutKryoRegistrator.scala              |   6 +-
 .../apache/mahout/sparkbindings/package.scala   |  61 ++++-
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   7 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |  14 +-
 .../mahout/sparkbindings/blas/AtASuite.scala    |   4 +-
 .../mahout/sparkbindings/blas/AtSuite.scala     |  12 +-
 .../decompositions/MathSuite.scala              |   7 +-
 .../sparkbindings/drm/DrmLikeOpsSuite.scala     |   6 +-
 .../mahout/sparkbindings/drm/DrmLikeSuite.scala |   6 +-
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  30 +--
 .../sparkbindings/test/MahoutLocalContext.scala |   7 +-
 97 files changed, 2607 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 4604b7d..95fe2c7 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -167,7 +167,6 @@
       <artifactId>mahout-math</artifactId>
     </dependency>
 
-
     <!--  3rd-party -->
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
new file mode 100644
index 0000000..850614457
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+/** Broadcast variable abstraction */
+trait BCast[T] {
+  def value:T
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
new file mode 100644
index 0000000..ac763f9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -0,0 +1,19 @@
+package org.apache.mahout.math.drm
+
+object CacheHint extends Enumeration {
+
+  type CacheHint = Value
+
+  val NONE,
+  DISK_ONLY,
+  DISK_ONLY_2,
+  MEMORY_ONLY,
+  MEMORY_ONLY_2,
+  MEMORY_ONLY_SER,
+  MEMORY_ONLY_SER_2,
+  MEMORY_AND_DISK,
+  MEMORY_AND_DISK_2,
+  MEMORY_AND_DISK_SER,
+  MEMORY_AND_DISK_SER_2 = Value
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
new file mode 100644
index 0000000..0266944
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.math.Matrix
+
+/**
+ * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
+ * therefore collected or saved.
+ * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+ */
+trait CheckpointedDrm[K] extends DrmLike[K] {
+
+  def collect: Matrix
+
+  def writeDRM(path: String)
+
+  /** If this checkpoint is already declared cached, uncache. */
+  def uncache()
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
new file mode 100644
index 0000000..fa1ccfd
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+
+
+/**
+ * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
+ * the DRMBase once they stabilize.
+ *
+ */
+class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+
+
+  /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
+  def colSums(): Vector = drm.context.colSums(drm)
+
+  /** Column Means */
+  def colMeans(): Vector = drm.context.colMeans(drm)
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
new file mode 100644
index 0000000..39bab90
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import java.io.Closeable
+
+/** Distributed context (a.k.a. distributed session handle) */
+trait DistributedContext extends Closeable {
+
+  val engine:DistributedEngine
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
new file mode 100644
index 0000000..0e76d87
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import logical._
+import org.apache.mahout.math.{Matrix, Vector}
+import DistributedEngine._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+/** Abstraction of optimizer/distributed engine */
+trait DistributedEngine {
+
+  /**
+   * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
+   * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
+   * per se.
+   * <P>
+   *   
+   * A particular physical engine implementation may choose to either use the default rewrites or
+   * build its own rewriting rules.
+   * <P>
+   */
+  def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
+
+  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
+
+  /** Engine-specific colSums implementation based on a checkpoint. */
+  def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+  /** Engine-specific colMeans implementation based on a checkpoint. */
+  def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+  /** Broadcast support */
+  def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
+
+  /** Broadcast support */
+  def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
+
+  /** Load DRM from hdfs (as in Mahout DRM format) */
+  def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[String]
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Creates empty DRM with non-trivial height */
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Long]
+}
+
+object DistributedEngine {
+
+  /** This is mostly multiplication operations rewrites */
+  private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+
+    action match {
+      case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
+      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+      // For now, rewrite left-multiply via transpositions, i.e.
+      // inCoreA %*% B = (B' %*% inCoreA')'
+      case op@OpTimesLeftMatrix(a, b) =>
+        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass1(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass1(bop.A)(bop.classTagA)
+        bop.B = pass1(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** This would remove stuff like A.t.t that previous step may have created */
+  private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+      // A.t.t => A
+      case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass2(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass2(bop.A)(bop.classTagA)
+        bop.B = pass2(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** Some further rewrites that are conditioned on A.t.t removal */
+  private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+
+      // matrix products.
+      case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+
+      // AtB cases that make sense.
+      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
+      case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+
+      // Need some cost to choose between the following.
+
+      case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+      //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
+      case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+      // Rewrite A'x
+      case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass3(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass3(bop.A)(bop.classTagA)
+        bop.B = pass3(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
new file mode 100644
index 0000000..8e0db1e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+
+/**
+ *
+ * Basic spark DRM trait.
+ *
+ * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
+ * this package. Spark backing is already implied.
+ *
+ */
+trait DrmLike[K] {
+
+  protected[mahout] def partitioningTag:Long
+
+  protected[mahout] val context:DistributedContext
+
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long
+
+  /** R-like syntax for number of columns */
+  def ncol: Int
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
new file mode 100644
index 0000000..35e28af
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange}
+
+/** Common Drm ops */
+class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+  /**
+   * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
+   * matrices; or they could be completely new matrices with new keyset. In the latter case, output
+   * matrix width must be specified with <code>ncol</code> parameter.<P>
+   *
+   * New block heights must be of the same height as the original geometry.<P>
+   *
+   * @param ncol new matrix' width (only needed if width changes).
+   * @param bmf
+   * @tparam R
+   * @return
+   */
+  def mapBlock[R : ClassTag](ncol: Int = -1)
+      (bmf: BlockMapFunc[K, R]): DrmLike[R] =
+    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+
+
+  /**
+   * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
+   *
+   * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
+   *
+   * Row range is currently unsupported except for the all-range. When it will be fully supported,
+   * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
+   *
+   * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
+   * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
+   */
+  def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
+
+    import RLikeDrmOps._
+    import RLikeOps._
+
+    val rowSrc: DrmLike[K] = if (rowRange != ::) {
+
+      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+
+        assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
+        val intKeyed = drm.asInstanceOf[DrmLike[Int]]
+
+        new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
+
+      } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
+
+    } else drm
+
+    if (colRange != ::) {
+
+      assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
+
+      // Use mapBlock operator to do in-core subranging.
+      rowSrc.mapBlock(ncol = colRange.length)({
+        case (keys, block) => keys -> block(::, colRange)
+      })
+
+    } else rowSrc
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
new file mode 100644
index 0000000..f46d15c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Vector, Matrix}
+import org.apache.mahout.math.drm.logical._
+
+class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+
+  import RLikeDrmOps._
+
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+
+  def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+  def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
+
+  def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
+
+  def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+  def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
+
+  def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
+
+  def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+
+  def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
+
+  def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
+
+  def %*%(that: Matrix): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
+
+  def %*%(that: Vector): DrmLike[K] = :%*%(that)
+
+  def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+}
+
+class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
+
+  override def t: DrmLike[Int] = OpAt(A = drm)
+
+  def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
+
+  def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
+
+}
+
+object RLikeDrmOps {
+  implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
+
+  implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+
+  implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+
+  implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+
+  implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+
+  /**
+   * This is probably dangerous since it triggers implicit checkpointing with default storage level
+   * setting.
+   */
+  implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+  private val log = Logger.getLogger(DQR.getClass)
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    if (A.ncol > 5000)
+      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+    implicit val ctx = A.context
+
+    val AtA = (A.t %*% A).checkpoint()
+    val inCoreAtA = AtA.collect
+
+    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+    val ch = chol(inCoreAtA)
+    val inCoreR = (ch.getL cloned) t
+
+    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+    if (checkRankDeficiency && !ch.isPositiveDefinite)
+      throw new IllegalArgumentException("R is rank-deficient.")
+
+    val bcastAtA = drmBroadcast(inCoreAtA)
+
+    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+    // decompose A'A in the backend again.
+
+    // Compute Q = A*inv(L') -- we can do it blockwise.
+    val Q = A.mapBlock() {
+      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+    }
+
+    Q -> inCoreR
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+    implicit val ctx = A.context
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // Dataset mean
+    val xi = drmA.colMeans
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+    // memory beyond that is required to keep xi around, it still might be parallelized to backs
+    // for significantly big n and r. TODO
+    val s_o = omega.t %*% xi
+
+    val bcastS_o = drmBroadcast(s_o)
+    val bcastXi = drmBroadcast(xi)
+
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val s_o:Vector = bcastS_o
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+        keys -> blockY
+    }
+        // Checkpoint Y
+        .checkpoint()
+
+    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+    var s_q = drmQ.colSums()
+    var bcastVarS_q = drmBroadcast(s_q)
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+    var s_b = (drmBt.t %*% xi).collect(::, 0)
+    var bcastVarS_b = drmBroadcast(s_b)
+
+    for (i <- 0 until q) {
+
+      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
+      // create readonly closure attributes in this very scope.
+      val bcastS_q = bcastVarS_q
+      val bcastS_b = bcastVarS_b
+      val bcastXib = bcastXi
+
+      // Fix Bt as B' -= xi cross s_q
+      drmBt = drmBt.mapBlock() {
+        case (keys, block) =>
+          val s_q: Vector = bcastS_q
+          val xi: Vector = bcastXib
+          keys.zipWithIndex.foreach {
+            case (key, idx) => block(idx, ::) -= s_q * xi(key)
+          }
+          keys -> block
+      }
+
+      drmY.uncache()
+      drmQ.uncache()
+
+      drmY = (drmA %*% drmBt)
+          // Fix Y by subtracting s_b from each row of the AB'
+          .mapBlock() {
+        case (keys, block) =>
+          val s_b: Vector = bcastS_b
+          for (row <- 0 until block.nrow) block(row, ::) -= s_b
+          keys -> block
+      }
+          // Checkpoint Y
+          .checkpoint()
+
+      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+      s_q = drmQ.colSums()
+      bcastVarS_q = drmBroadcast(s_q)
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = (drmA.t %*% drmQ).checkpoint()
+
+      s_b = (drmBt.t %*% xi).collect(::, 0)
+      bcastVarS_b = drmBroadcast(s_b)
+    }
+
+    val c = s_q cross s_b
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+        c - c.t + (s_q cross s_q) * (xi dot xi)
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+
+    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+    // is much more compact.
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        keys -> blockY
+    }
+
+    var drmQ = dqrThin(drmY.checkpoint())._1
+    // Checkpoint Q if last iteration
+    if (q == 0) drmQ = drmQ.checkpoint()
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = drmA.t %*% drmQ
+    // Checkpoint B' if last iteration
+    if (q == 0) drmBt = drmBt.checkpoint()
+
+    for (i <- 0  until q) {
+      drmY = drmA %*% drmBt
+      drmQ = dqrThin(drmY.checkpoint())._1
+      // Checkpoint Q if last iteration
+      if (i == q - 1) drmQ = drmQ.checkpoint()
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = drmA.t %*% drmQ
+      // Checkpoint B' if last iteration
+      if (i == q - 1) drmBt = drmBt.checkpoint()
+    }
+
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
new file mode 100644
index 0000000..c2371d1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
+    extends CheckpointAction[K] with DrmLike[K] {
+
+  protected[drm] var A: DrmLike[A]
+  protected[drm] var B: DrmLike[B]
+  protected[mahout] lazy val context: DistributedContext = A.context
+
+  // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
+  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+  def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
+
+  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
new file mode 100644
index 0000000..eb5ef9a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+/** Abstract unary operator */
+abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
+    extends CheckpointAction[K] with DrmLike[K] {
+
+  protected[drm] var A: DrmLike[A]
+
+  protected[mahout] lazy val context: DistributedContext = A.context
+
+  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
new file mode 100644
index 0000000..aa3a3b9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import scala.util.Random
+import org.apache.mahout.math.drm._
+
+/** Implementation of distributed expression checkpoint and optimizer. */
+abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
+
+  protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+  private[mahout] var cp:Option[CheckpointedDrm[K]] = None
+
+  def isIdenticallyPartitioned(other:DrmLike[_]) =
+    partitioningTag!= 0L && partitioningTag == other.partitioningTag
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match {
+    case None =>
+      val physPlan = context.toPhysical(context.optimizerRewrite(this), cacheHint)
+      cp = Some(physPlan)
+      physPlan
+    case Some(cp) => cp
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
new file mode 100644
index 0000000..804a00e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpAB[K: ClassTag ](
+    override var A: DrmLike[K],
+    override var B: DrmLike[Int])
+    extends AbstractBinaryOp[K, Int, K] {
+
+  assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
new file mode 100644
index 0000000..f131f3f
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpABAnyKey[B:ClassTag, K: ClassTag ](
+    override var A: DrmLike[K],
+    override var B: DrmLike[B])
+    extends AbstractBinaryOp[K, B, K] {
+
+  assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
new file mode 100644
index 0000000..f6503ed
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical AB' */
+case class OpABt[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[Int])
+    extends AbstractBinaryOp[K,Int,K]  {
+
+  assert(A.ncol == B.ncol, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(B.nrow)
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
new file mode 100644
index 0000000..d07172a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** DRM elementwise operator */
+case class OpAewB[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[K],
+    val op: Char
+    ) extends AbstractBinaryOp[K, K, K] {
+
+  assert(A.ncol == B.ncol, "arguments must have same number of columns")
+  assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
new file mode 100644
index 0000000..91e0dd4
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Operator denoting expressions like 5.0 - A or A * 5.6 */
+case class OpAewScalar[K: ClassTag](
+    override var A: DrmLike[K],
+    val scalar: Double,
+    val op: String
+    ) extends AbstractUnaryOp[K,K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
new file mode 100644
index 0000000..3239ad2
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm._
+
+/** Logical A' */
+case class OpAt(
+    override var A: DrmLike[Int])
+    extends AbstractUnaryOp[Int, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(A.nrow)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
new file mode 100644
index 0000000..c7c6046
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** A'A */
+case class OpAtA[K: ClassTag](
+    override var A: DrmLike[K]
+    ) extends AbstractUnaryOp[K, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long = throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
new file mode 100644
index 0000000..4e1dd5c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical A' for any row key to support A'A optimizations */
+case class OpAtAnyKey[A: ClassTag](
+    override var A: DrmLike[A])
+    extends AbstractUnaryOp[A, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(A.nrow)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
new file mode 100644
index 0000000..ef3ae6b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical A'B */
+case class OpAtB[A: ClassTag](
+    override var A: DrmLike[A],
+    override var B: DrmLike[A])
+    extends AbstractBinaryOp[A, A, Int] {
+
+  assert(A.nrow == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
new file mode 100644
index 0000000..36769c7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+
+/** Logical A'x. */
+case class OpAtx(
+    override var A: DrmLike[Int],
+    val x: Vector
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.nrow == x.length, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = safeToNonNegInt(A.ncol)
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = 1
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
new file mode 100644
index 0000000..a726989
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Ax. */
+case class OpAx[K: ClassTag](
+    override var A: DrmLike[K],
+    val x: Vector
+    ) extends AbstractUnaryOp[K, K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.ncol == x.length, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = 1
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
new file mode 100644
index 0000000..8e4362d
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+
+class OpMapBlock[S: ClassTag, R: ClassTag](
+    override var A: DrmLike[S],
+    val bmf: BlockMapFunc[S, R],
+    val _ncol: Int = -1,
+    val _nrow: Long = -1
+    ) extends AbstractUnaryOp[S, R] {
+
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
new file mode 100644
index 0000000..697bbd3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical row-range slicing */
+case class OpRowRange(
+    override var A: DrmLike[Int],
+    val rowRange: Range
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = rowRange.length
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
new file mode 100644
index 0000000..1ca79b3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Times-left over in-core matrix operand */
+case class OpTimesLeftMatrix(
+    val left: Matrix,
+    override var A: DrmLike[Int]
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  assert(left.ncol == A.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = left.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+  /** Non-zero element count */
+  // TODO
+  def nNonZero: Long = throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
new file mode 100644
index 0000000..c55f7f0
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical times-right over in-core matrix operand. */
+case class OpTimesRightMatrix[K: ClassTag](
+    override var A: DrmLike[K],
+    val right: Matrix
+    ) extends AbstractUnaryOp[K, K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.ncol == right.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = right.ncol
+
+  /** Non-zero element count */
+  // TODO
+  def nNonZero: Long = throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
new file mode 100644
index 0000000..768bb1c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+
+package object drm {
+
+  /** Drm row-wise tuple */
+  type DrmTuple[K] = (K, Vector)
+
+  /** Drm block-wise tuple: Array of row keys and the matrix block. */
+  type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
+
+
+  /** Block-map func */
+  type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
+
+  /** CacheHint type */
+  //  type CacheHint = CacheHint.CacheHint
+
+  def safeToNonNegInt(x: Long): Int = {
+    assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
+    x.toInt
+  }
+
+  /** Broadcast support API */
+  def drmBroadcast(m:Matrix)(implicit ctx:DistributedContext):BCast[Matrix] = ctx.drmBroadcast(m)
+
+  /** Broadcast support API */
+  def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
+
+  /** Load DRM from hdfs (as in Mahout DRM format) */
+  def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path)
+
+  /** Shortcut to parallelizing matrices with indices, ignore row labels. */
+  def drmParallelize(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int] = drmParallelizeWithRowIndices(m, numPartitions)(sc)
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeWithRowIndices(m, numPartitions)
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit ctx: DistributedContext): CheckpointedDrm[String] = ctx.drmParallelizeWithRowLabels(m, numPartitions)
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeEmpty(nrow, ncol, numPartitions)
+
+  /** Creates empty DRM with non-trivial height */
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
+
+  /** Implicit broadcast -> value conversion. */
+  implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+
+  /** Just throw all engine operations into context as well. */
+  implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+
+  implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
+    new CheckpointedOps[K](drm)
+
+  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+  // ============== Decompositions ===================
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+    DQR.dqrThin(A, checkRankDeficiency)
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+
+}