You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/05/27 21:14:52 UTC

[2/3] MAHOUT-1529 closes PR #1

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
deleted file mode 100644
index 8e05a83..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.scalabindings
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import RLikeOps._
-import org.apache.mahout.common.RandomUtils
-import scala._
-import org.apache.log4j.Logger
-
-private[math] object SSVD {
-
-  private val log = Logger.getLogger(SSVD.getClass)
-
-  /**
-   * In-core SSVD algorithm.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    var y = a %*% omega
-    var yty = y.t %*% y
-    val at = a.t
-    var ch = chol(yty)
-    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-    var bt = ch.solveRight(at %*% y)
-
-    // Power iterations
-    for (i <- 0 until q) {
-      y = a %*% bt
-      yty = y.t %*% y
-      ch = chol(yty)
-      bt = ch.solveRight(at %*% y)
-    }
-
-    val bbt = bt.t %*% bt
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = ch.solveRight(y) %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-  }
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    // Dataset mean
-    val xi = a.colMeans()
-
-    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
-    var y = a %*% omega
-
-    // Fixing y
-    val s_o = omega.t %*% xi
-    y := ((r,c,v) => v - s_o(c))
-
-    var yty = y.t %*% y
-    var ch = chol(yty)
-//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
-    // This is implicit Q of QR(Y)
-    var qm = ch.solveRight(y)
-    var bt = a.t %*% qm
-    var s_q = qm.colSums()
-    var s_b = bt.t %*% xi
-
-    // Power iterations
-    for (i <- 0 until q) {
-
-      // Fix bt
-      bt -= xi cross s_q
-
-      y = a %*% bt
-
-      // Fix Y again.
-      y := ((r,c,v) => v - s_b(c))
-
-      yty = y.t %*% y
-      ch = chol(yty)
-      qm = ch.solveRight(y)
-      bt = a.t %*% qm
-      s_q = qm.colSums()
-      s_b = bt.t %*% xi
-    }
-
-    val c = s_q cross s_b
-
-    // BB' computation becomes
-    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
-
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = qm %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+  private val log = Logger.getLogger(SSVD.getClass)
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    var y = a %*% omega
+    var yty = y.t %*% y
+    val at = a.t
+    var ch = chol(yty)
+    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+    var bt = ch.solveRight(at %*% y)
+
+    // Power iterations
+    for (i <- 0 until q) {
+      y = a %*% bt
+      yty = y.t %*% y
+      ch = chol(yty)
+      bt = ch.solveRight(at %*% y)
+    }
+
+    val bbt = bt.t %*% bt
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = ch.solveRight(y) %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+  }
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    // Dataset mean
+    val xi = a.colMeans()
+
+    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+    var y = a %*% omega
+
+    // Fixing y
+    val s_o = omega.t %*% xi
+    y := ((r,c,v) => v - s_o(c))
+
+    var yty = y.t %*% y
+    var ch = chol(yty)
+//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+    // This is implicit Q of QR(Y)
+    var qm = ch.solveRight(y)
+    var bt = a.t %*% qm
+    var s_q = qm.colSums()
+    var s_b = bt.t %*% xi
+
+    // Power iterations
+    for (i <- 0 until q) {
+
+      // Fix bt
+      bt -= xi cross s_q
+
+      y = a %*% bt
+
+      // Fix Y again.
+      y := ((r,c,v) => v - s_b(c))
+
+      yty = y.t %*% y
+      ch = chol(yty)
+      qm = ch.solveRight(y)
+      bt = a.t %*% qm
+      s_q = qm.colSums()
+      s_b = bt.t %*% xi
+    }
+
+    val c = s_q cross s_b
+
+    // BB' computation becomes
+    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = qm %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index c9e59ba..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.math
 
 import org.apache.mahout.math._
 import org.apache.mahout.math.solver.EigenDecomposition
+import org.apache.mahout.math.decompositions.SSVD
 
 /**
  * Mahout matrices and vectors' scala syntactic sugar

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index a97b453..020a2f9 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.sparkbindings.shell
 
 import org.apache.spark.repl.SparkILoop
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.mahout.sparkbindings._
 import scala.tools.nsc.Properties
 import scala.Some
+import org.apache.mahout.sparkbindings._
 
 class MahoutSparkILoop extends SparkILoop {
 
   private val postInitScript =
-      "import org.apache.mahout.math.Vector" ::
-      "import org.apache.mahout.math.scalabindings._" ::
-      "import RLikeOps._" ::
-      "import org.apache.mahout.sparkbindings._" ::
-      "import drm._" ::
-      "import RLikeDrmOps._" ::
-      "org.apache.spark.storage.StorageLevel" ::
-      "implicit val _sc = sc" ::
-      Nil
+    "import org.apache.mahout.math._" ::
+        "import scalabindings._" ::
+        "import RLikeOps._" ::
+        "import drm._" ::
+        "import RLikeDrmOps._" ::
+        "import org.apache.mahout.sparkbindings._" ::
+        Nil
 
   override protected def postInitialization() {
     super.postInitialization()
@@ -50,10 +48,25 @@ class MahoutSparkILoop extends SparkILoop {
       customJars = jars,
       sparkConf = conf
     )
+
     echo("Created spark context..")
     sparkContext
   }
 
+  override def initializeSpark() {
+    intp.beQuietDuring {
+      command("""
+
+         @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext =
+            new org.apache.mahout.sparkbindings.SparkDistributedContext(
+            org.apache.spark.repl.Main.interp.createSparkContext())
+
+              """)
+      command("import org.apache.spark.SparkContext._")
+      echo("Mahout distributed context is available as \"implicit val sdc\".")
+    }
+  }
+
   override def prompt: String = "mahout> "
 
   override def printWelcome(): Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
index 648f07f..9c0a51f 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.mahout.sparkbindings.shell
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/test/mahout/simple.mscala
----------------------------------------------------------------------
diff --git a/spark-shell/src/test/mahout/simple.mscala b/spark-shell/src/test/mahout/simple.mscala
index 385e4e8..854c482 100644
--- a/spark-shell/src/test/mahout/simple.mscala
+++ b/spark-shell/src/test/mahout/simple.mscala
@@ -1,8 +1,25 @@
-import org.apache.mahout.sparkbindings._
-import drm._
-import RLikeDrmOps._
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import org.apache.spark.storage.StorageLevel
+/*
+ To run, execute from mahout shell:
+
+ :load spark-shell/src/test/mahout/simple.mscala
+*/
 
 val a = dense((1,2,3),(3,4,5))
 val drmA = drmParallelize(a,numPartitions = 2)
@@ -19,5 +36,5 @@ r.collect
 // local write
 r.writeDRM("file:///home/dmitriy/A")
 
-// hdfs write
-r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
+// hdfs write -- uncomment to test
+// r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 8b89969..ac99ffd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -220,53 +220,52 @@
         </property>
       </activation>
       <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-client</artifactId>
-        <version>${hadoop2.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.mahout</groupId>
-        <artifactId>mahout-mrlegacy</artifactId>
-        <exclusions>
-          <exclusion>
-            <groupId>asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-core</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-common</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-core</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-common</artifactId>
-      </dependency>
-    </dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop2.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.mahout</groupId>
+          <artifactId>mahout-mrlegacy</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-core</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </dependency>
+      </dependencies>
     </profile>
   </profiles>
 
   <dependencies>
-    <!-- spark stuff -->
+
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.major}</artifactId>
-      <version>${spark.version}</version>
+      <groupId>org.apache.mahout</groupId>
+      <artifactId>mahout-math-scala</artifactId>
     </dependency>
 
     <dependency>
@@ -283,18 +282,18 @@
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-math-scala</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala</artifactId>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
 
 
     <!--  3rd-party -->
-
+    <!-- spark stuff -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.major}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
 
     <!-- scala stuff -->
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
new file mode 100644
index 0000000..4d13a5a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext}
+import org.apache.spark.SparkContext
+
+class SparkDistributedContext(val sc: SparkContext) extends DistributedContext {
+
+  val engine: DistributedEngine = SparkEngine
+
+  def close() {
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
new file mode 100644
index 0000000..0c904ab
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.math._
+import scala.reflect.ClassTag
+import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings.blas._
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import scala.Some
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext
+
+/** Spark-specific non-drm-method operations */
+object SparkEngine extends DistributedEngine {
+
+  def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+    val n = drm.ncol
+
+    drm.rdd
+        // Throw away keys
+        .map(_._2)
+        // Fold() doesn't work with kryo still. So work around it.
+        .mapPartitions(iter => {
+      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+      Iterator(acc)
+    })
+        // Since we preallocated new accumulator vector per partition, this must not cause any side
+        // effects now.
+        .reduce(_ += _)
+  }
+
+  /** Engine-specific colMeans implementation based on a checkpoint. */
+  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+  /**
+   * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+   *
+   * A particular physical engine implementation may choose to either use or not use these rewrites
+   * as a useful basic rewriting rule.<P>
+   */
+  override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
+
+  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+    // Spark-specific Physical Plan translation.
+    val rdd = tr2phys(plan)
+
+    val newcp = new CheckpointedDrmSpark(
+      rdd = rdd,
+      _nrow = plan.nrow,
+      _ncol = plan.ncol,
+      _cacheStorageLevel = cacheHint2Spark(ch),
+      partitioningTag = plan.partitioningTag
+    )
+    newcp.cache()
+  }
+
+  /** Broadcast support */
+  def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = dc.broadcast(v)
+
+  /** Broadcast support */
+  def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
+
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format)
+   *
+   * @param path
+   * @param sc spark context (wanted to make that implicit, doesn't work in current version of
+   *           scala with the type bounds, sorry)
+   *
+   * @return DRM[Any] where Any is automatically translated to value type
+   */
+  def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+    implicit val scc:SparkContext = sc
+    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
+
+    val key = rdd.map(_._1).take(1)(0)
+    val keyWClass = key.getClass.asSubclass(classOf[Writable])
+
+    val key2val = key match {
+      case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
+      case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
+      case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
+      case xx: Writable => (v: AnyRef) => v
+    }
+
+    val val2key = key match {
+      case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
+      case xx: Text => (x: Any) => new Text(x.toString)
+      case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
+      case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
+    }
+
+    val  km = key match {
+      case xx: IntWritable => implicitly[ClassTag[Int]]
+      case xx: Text => implicitly[ClassTag[String]]
+      case xx: LongWritable => implicitly[ClassTag[Long]]
+      case xx: Writable => ClassTag(classOf[Writable])
+    }
+
+    {
+      implicit def getWritable(x: Any): Writable = val2key()
+      new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+    }
+  }
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext)
+  : CheckpointedDrm[Int] = {
+    new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+  }
+
+  private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): DrmRdd[Int] = {
+
+    val p = (0 until m.nrow).map(i => i -> m(i, ::))
+    sc.parallelize(p, numPartitions)
+
+  }
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext)
+  : CheckpointedDrm[String] = {
+
+    val rb = m.getRowLabelBindings
+    val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+
+    new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+  }
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+      val partNRow = (nrow - 1) / numPartitions + 1
+      val partStart = partNRow * part
+      val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    })
+    new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
+  }
+
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+      val partNRow = (nrow - 1) / numPartitions + 1
+      val partStart = partNRow * part
+      val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    })
+    new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
+  }
+
+  private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
+    case CacheHint.NONE => StorageLevel.NONE
+    case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
+    case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
+    case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
+    case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
+    case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
+    case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
+    case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
+    case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
+    case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+  }
+
+  /** Translate previously optimized physical plan */
+  private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+    // I do explicit evidence propagation here since matching via case classes seems to be loosing
+    // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
+    // Hence you see explicit evidence attached to all recursive exec() calls.
+    oper match {
+      // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
+      // the A'A case but actual transposition intent which should be removed from consideration
+      // (we cannot do actual flip for non-int-keyed arguments)
+      case OpAtAnyKey(_) =>
+        throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+      case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
+      case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
+        zippable = a.partitioningTag == b.partitioningTag)
+      case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
+      case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+      case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+      // Custom operators, we just execute them
+      case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+        src = tr2phys(blockOp.A)(blockOp.classTagA),
+        ncol = blockOp.ncol,
+        bmf = blockOp.bmf
+      )
+      case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
+      case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+          .format(oper))
+
+    }
+  }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 8d19068..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.blas
 
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
 import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.sparkbindings._
+import drm._
 import org.apache.mahout.math.{Matrix, SparseRowMatrix}
 import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
 
 /** Contains RDD plans for ABt operator */
 object ABt {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index ec93cf7..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,13 +17,13 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.{OpAewScalar, OpAewB}
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
 
 /** Elementwise drm-drm operators */
 object AewB {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 0383fe1..c923e62 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -1,13 +1,14 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
 import RLikeOps._
-
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
 import org.apache.mahout.math.DiagonalMatrix
+import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 
 /** Matrix product with one of operands an in-core matrix */
 object AinCoreB {
@@ -21,8 +22,8 @@ object AinCoreB {
 
   private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
-    val dg = drmBroadcast(x = op.right.viewDiagonal())
+    implicit val ctx:DistributedContext = rddA.context
+    val dg = drmBroadcast(op.right.viewDiagonal())
 
     val rdd = rddA
         // Just multiply the blocks
@@ -35,7 +36,7 @@ object AinCoreB {
   private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val sc:DistributedContext = rddA.sparkContext
 
     val bcastB = drmBroadcast(m = op.right)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 38af173..56de9f4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,12 +17,12 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
+import org.apache.mahout.math.drm.logical.OpAt
 
 /** A' algorithms */
 object At {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index 17cab62..450e836 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -18,14 +18,16 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import collection._
 import JavaConversions._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
 import org.apache.spark.SparkContext._
 import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtA
+import SparkEngine._
 
 /**
  * Collection of algorithms to compute X' times X

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index f0c3423..86aadc8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -18,13 +18,14 @@
 package org.apache.mahout.sparkbindings.blas
 
 import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings.drm._
 import org.apache.spark.rdd.RDD
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.SparkContext._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtB}
 import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtB
 
 object AtB {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index e6de443..94c3f06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -1,12 +1,13 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
 import RLikeOps._
-
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAx, OpTimesRightMatrix}
+import drm._
+import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
 
 
 /** Matrix product with one of operands an in-core matrix */
@@ -15,9 +16,9 @@ object Ax {
   def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val sc:DistributedContext = rddA.sparkContext
 
-    val bcastX = drmBroadcast(x = op.x)
+    val bcastX = drmBroadcast(op.x)
 
     val rdd = rddA
         // Just multiply the blocks
@@ -30,9 +31,9 @@ object Ax {
 
   def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
     val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc = rddA.sparkContext
+    implicit val dc:DistributedContext = rddA.sparkContext
 
-    val bcastX = drmBroadcast(x = op.x)
+    val bcastX = drmBroadcast(op.x)
 
     val inCoreM = rddA
         // Just multiply the blocks
@@ -51,7 +52,7 @@ object Ax {
     // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
     // it back as drm blockified rdd
 
-    val rdd = sc.parallelize(Seq(inCoreM), numSlices = 1)
+    val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
         .map(block => Array.tabulate(block.nrow)(i => i) -> block)
 
     new DrmRddInput(blockifiedSrc = Some(rdd))

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 6bb7b4b..a3caac7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -17,11 +17,11 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.DrmRdd
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
+import org.apache.mahout.sparkbindings.DrmRdd
 
 class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
new file mode 100644
index 0000000..4c68c9a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import scala.reflect.ClassTag
+
+object MapBlock {
+
+  def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+
+    // We can't use attributes to avoid putting the whole this into closure.
+
+    val rdd = src.toBlockifiedDrmRdd()
+        .map(blockTuple => {
+      val out = bmf(blockTuple)
+
+      assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
+      assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
+
+      out
+    })
+    new DrmRddInput(blockifiedSrc = Some(rdd))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 5affd3b..d0a50b5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -1,7 +1,7 @@
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.plan.OpRowRange
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRowRange
 
 object Slicing {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 795f2e2..d2d5340 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,7 +17,6 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.sparkbindings.drm.DrmRdd
 import scala.reflect.ClassTag
 
 /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
deleted file mode 100644
index 89d3735..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
-  private val log = Logger.getLogger(DQR.getClass)
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
-    if (A.ncol > 5000)
-      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
-    val AtA = (A.t %*% A).checkpoint()
-    val inCoreAtA = AtA.collect
-    implicit val sc = AtA.rdd.sparkContext
-
-    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
-    val ch = chol(inCoreAtA)
-    val inCoreR = (ch.getL cloned) t
-
-    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
-    if (checkRankDeficiency && !ch.isPositiveDefinite)
-      throw new IllegalArgumentException("R is rank-deficient.")
-
-    val bcastAtA = sc.broadcast(inCoreAtA)
-
-    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
-    // decompose A'A in the backend again.
-
-    // Compute Q = A*inv(L') -- we can do it blockwise.
-    val Q = A.mapBlock() {
-      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
-    }
-
-    Q -> inCoreR
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
deleted file mode 100644
index f3b0e3f..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-    implicit val sc = drmA.rdd.sparkContext
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // Dataset mean
-    val xi = drmA.colMeans
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
-    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
-    // memory beyond that is required to keep xi around, it still might be parallelized to backs
-    // for significantly big n and r. TODO
-    val s_o = omega.t %*% xi
-
-    val bcastS_o = drmBroadcast(s_o)
-    val bcastXi = drmBroadcast(xi)
-
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val s_o:Vector = bcastS_o
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
-        keys -> blockY
-    }
-        // Checkpoint Y
-        .checkpoint()
-
-    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-    var s_q = drmQ.colSums()
-    var bcastVarS_q = drmBroadcast(s_q)
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = (drmA.t %*% drmQ).checkpoint()
-
-    var s_b = (drmBt.t %*% xi).collect(::, 0)
-    var bcastVarS_b = drmBroadcast(s_b)
-
-    for (i <- 0 until q) {
-
-      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
-      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
-      // create readonly closure attributes in this very scope.
-      val bcastS_q = bcastVarS_q
-      val bcastS_b = bcastVarS_b
-      val bcastXib = bcastXi
-
-      // Fix Bt as B' -= xi cross s_q
-      drmBt = drmBt.mapBlock() {
-        case (keys, block) =>
-          val s_q: Vector = bcastS_q
-          val xi: Vector = bcastXib
-          keys.zipWithIndex.foreach {
-            case (key, idx) => block(idx, ::) -= s_q * xi(key)
-          }
-          keys -> block
-      }
-
-      drmY.uncache()
-      drmQ.uncache()
-
-      drmY = (drmA %*% drmBt)
-          // Fix Y by subtracting s_b from each row of the AB'
-          .mapBlock() {
-        case (keys, block) =>
-          val s_b: Vector = bcastS_b
-          for (row <- 0 until block.nrow) block(row, ::) -= s_b
-          keys -> block
-      }
-          // Checkpoint Y
-          .checkpoint()
-
-      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-      s_q = drmQ.colSums()
-      bcastVarS_q = drmBroadcast(s_q)
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = (drmA.t %*% drmQ).checkpoint()
-
-      s_b = (drmBt.t %*% xi).collect(::, 0)
-      bcastVarS_b = drmBroadcast(s_b)
-    }
-
-    val c = s_q cross s_b
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
-        c - c.t + (s_q cross s_q) * (xi dot xi)
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
deleted file mode 100644
index de15d2b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
-  /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-
-    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
-    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
-    // is much more compact.
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        keys -> blockY
-    }
-
-    var drmQ = dqrThin(drmY.checkpoint())._1
-    // Checkpoint Q if last iteration
-    if (q == 0) drmQ = drmQ.checkpoint()
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = drmA.t %*% drmQ
-    // Checkpoint B' if last iteration
-    if (q == 0) drmBt = drmBt.checkpoint()
-
-    for (i <- 0  until q) {
-      drmY = drmA %*% drmBt
-      drmQ = dqrThin(drmY.checkpoint())._1
-      // Checkpoint Q if last iteration
-      if (i == q - 1) drmQ = drmQ.checkpoint()
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = drmA.t %*% drmQ
-      // Checkpoint B' if last iteration
-      if (i == q - 1) drmBt = drmBt.checkpoint()
-    }
-
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
deleted file mode 100644
index faa89ef..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.mahout.sparkbindings.drm
-
-
-object CacheHint extends Enumeration {
-
-  type CacheHint = Value
-
-  val NONE,
-  DISK_ONLY,
-  DISK_ONLY_2,
-  MEMORY_ONLY,
-  MEMORY_ONLY_2,
-  MEMORY_ONLY_SER,
-  MEMORY_ONLY_SER_2,
-  MEMORY_AND_DISK,
-  MEMORY_AND_DISK_2,
-  MEMORY_AND_DISK_SER,
-  MEMORY_AND_DISK_SER_2 = Value
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
deleted file mode 100644
index 0007477..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.Matrix
-import org.apache.hadoop.io.Writable
-
-/**
- * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
- * therefore collected or saved.
- * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
- */
-trait CheckpointedDrm[K] extends DrmLike[K] {
-
-  def rdd: DrmRdd[K]
-
-  def collect: Matrix
-
-  def writeDRM(path: String)
-
-  /** If this checkpoint is already declared cached, uncache. */
-  def uncache()
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
deleted file mode 100644
index 8216881..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.{SparseMatrix, DenseMatrix, Matrix, Vector}
-import math._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.SparkContext._
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
-
-class CheckpointedDrmBase[K: ClassTag](
-    val rdd: DrmRdd[K],
-    private var _nrow: Long = -1L,
-    private var _ncol: Int = -1,
-    private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
-    private[sparkbindings] val partitioningTag: Long = Random.nextLong()
-
-    ) extends CheckpointedDrm[K] {
-
-
-  lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
-  lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
-
-  private var cached: Boolean = false
-
-
-  /**
-   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
-   * and writing down Spark graph lineage since last checkpointed DRM.
-   */
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] =
-  // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
-    this
-
-  def cache() = {
-    if (!cached) {
-      rdd.persist(_cacheStorageLevel)
-      cached = true
-    }
-    this
-  }
-
-
-  /**
-   * if matrix was previously persisted into cache,
-   * delete cached representation
-   */
-  def uncache() = {
-    if (cached) {
-      rdd.unpersist(blocking = false)
-      cached = false
-    }
-    this
-  }
-
-  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmBase[K] =
-    new CheckpointedDrmBase[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
-
-
-  /**
-   * Collecting DRM to fron-end in-core Matrix.
-   *
-   * If key in DRM is Int, then matrix is collected using key as row index.
-   * Otherwise, order of rows in result is undefined but key.toString is applied
-   * as rowLabelBindings of the in-core matrix .
-   *
-   * Note that this pre-allocates target matrix and then assigns collected RDD to it
-   * thus this likely would require about 2 times the RDD memory
-   * @return
-   */
-  def collect: Matrix = {
-
-    val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
-
-    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
-    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
-
-    // since currently spark #collect() requires Serializeable support,
-    // we serialize DRM vectors into byte arrays on backend and restore Vector
-    // instances on the front end:
-    val data = rdd.map(t => (t._1, t._2)).collect()
-
-
-    val m = if (data.forall(_._2.isDense))
-      new DenseMatrix(rows, cols)
-
-    else
-      new SparseMatrix(rows, cols)
-
-    if (intRowIndices)
-      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
-    else {
-
-      // assign all rows sequentially
-      val d = data.zipWithIndex
-      d.foreach(t => m(t._2, ::) := t._1._2)
-
-      // row bindings
-      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
-
-      m.setRowLabelBindings(rowBindings)
-    }
-
-    m
-  }
-
-
-  /**
-   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
-   * @param path
-   */
-  def writeDRM(path: String) = {
-    val ktag = implicitly[ClassTag[K]]
-
-    implicit val k2wFunc: (K) => Writable =
-      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
-      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
-      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
-      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
-      else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
-//    implicit def any2w(k: Any): Writable = k2wFunc(k)
-    rdd.saveAsSequenceFile(path)
-  }
-
-  protected def computeNRow = {
-
-    val intRowIndex = classTag[K] == classTag[Int]
-
-    if (intRowIndex)
-      cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
-    else
-      cache().rdd.count()
-  }
-
-  protected def computeNCol =
-    cache().rdd.map(_._2.length).fold(-1)(max(_, _))
-
-  protected def computeNNonZero =
-    cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
new file mode 100644
index 0000000..2d80fe3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math._
+import math._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import scala.collection.JavaConversions._
+import org.apache.spark.storage.StorageLevel
+import reflect._
+import scala.util.Random
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.spark.SparkContext._
+
+/** Spark-specific optimizer-checkpointed DRM. */
+class CheckpointedDrmSpark[K: ClassTag](
+    val rdd: DrmRdd[K],
+    private var _nrow: Long = -1L,
+    private var _ncol: Int = -1,
+    private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+    override protected[mahout] val partitioningTag: Long = Random.nextLong()
+    ) extends CheckpointedDrm[K] {
+
+  lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
+  lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
+
+  private var cached: Boolean = false
+  override protected[mahout] val context: DistributedContext = rdd.context
+
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+    // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
+    this
+  }
+
+  def cache() = {
+    if (!cached) {
+      rdd.persist(_cacheStorageLevel)
+      cached = true
+    }
+    this
+  }
+
+
+  /**
+   * if matrix was previously persisted into cache,
+   * delete cached representation
+   */
+  def uncache() = {
+    if (cached) {
+      rdd.unpersist(blocking = false)
+      cached = false
+    }
+    this
+  }
+
+//  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
+//    new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
+
+
+  /**
+   * Collecting DRM to fron-end in-core Matrix.
+   *
+   * If key in DRM is Int, then matrix is collected using key as row index.
+   * Otherwise, order of rows in result is undefined but key.toString is applied
+   * as rowLabelBindings of the in-core matrix .
+   *
+   * Note that this pre-allocates target matrix and then assigns collected RDD to it
+   * thus this likely would require about 2 times the RDD memory
+   * @return
+   */
+  def collect: Matrix = {
+
+    val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
+
+    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
+
+    // since currently spark #collect() requires Serializeable support,
+    // we serialize DRM vectors into byte arrays on backend and restore Vector
+    // instances on the front end:
+    val data = rdd.map(t => (t._1, t._2)).collect()
+
+
+    val m = if (data.forall(_._2.isDense))
+      new DenseMatrix(rows, cols)
+
+    else
+      new SparseMatrix(rows, cols)
+
+    if (intRowIndices)
+      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
+    else {
+
+      // assign all rows sequentially
+      val d = data.zipWithIndex
+      d.foreach(t => m(t._2, ::) := t._1._2)
+
+      // row bindings
+      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
+
+      m.setRowLabelBindings(rowBindings)
+    }
+
+    m
+  }
+
+
+  /**
+   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
+   * @param path
+   */
+  def writeDRM(path: String) = {
+    val ktag = implicitly[ClassTag[K]]
+
+    implicit val k2wFunc: (K) => Writable =
+      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
+      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
+      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
+      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
+      else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
+
+    rdd.saveAsSequenceFile(path)
+  }
+
+  protected def computeNRow = {
+
+    val intRowIndex = classTag[K] == classTag[Int]
+
+    if (intRowIndex)
+      cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+    else
+      cache().rdd.count()
+  }
+
+  protected def computeNCol =
+    cache().rdd.map(_._2.length).fold(-1)(max(_, _))
+
+  protected def computeNNonZero =
+    cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
new file mode 100644
index 0000000..7cf6bd6
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -0,0 +1,16 @@
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+import scala.reflect.ClassTag
+
+/** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */
+class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+
+  assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
+
+  private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
+
+  /** Spark matrix customization exposure */
+  def rdd = sparkDrm.rdd
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
deleted file mode 100644
index f891c1e..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{DenseVector, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-
-
-/**
- * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
- * the DRMBase once they stabilize.
- *
- */
-class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
-
-  /**
-   * Reorganize every partition into a single in-core matrix
-   * @return
-   */
-  def blockify(): BlockifiedDrmRdd[K] =
-    org.apache.mahout.sparkbindings.drm.blockify(rdd = drm.rdd, blockncol = drm.ncol)
-
-  /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
-  def colSums(): Vector = {
-    val n = drm.ncol
-
-    drm.rdd
-        // Throw away keys
-        .map(_._2)
-        // Fold() doesn't work with kryo still. So work around it.
-        .mapPartitions(iter => {
-      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
-      Iterator(acc)
-    })
-        // Since we preallocated new accumulator vector per partition, this must not cause any side
-        // effects now.
-        .reduce(_ += _)
-
-  }
-
-  def colMeans(): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
deleted file mode 100644
index de1f9bd..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.spark.storage.StorageLevel
-
-/**
- *
- * Basic spark DRM trait.
- *
- * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
- * this package. Spark backing is already implied.
- *
- */
-trait DrmLike[K] {
-
-  private[sparkbindings] def partitioningTag:Long
-
-  /** R-like syntax for number of rows. */
-  def nrow: Long
-
-  /** R-like syntax for number of columns */
-  def ncol: Int
-
-  /**
-   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
-   * and writing down Spark graph lineage since last checkpointed DRM.
-   */
-  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
deleted file mode 100644
index ce7b867..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io.Writable
-import org.apache.mahout.sparkbindings.drm.plan.{OpRowRange, OpMapBlock}
-import RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-/** Common Drm ops */
-class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
-
-  /**
-   * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
-   * matrices; or they could be completely new matrices with new keyset. In the latter case, output
-   * matrix width must be specified with <code>ncol</code> parameter.<P>
-   *
-   * New block heights must be of the same height as the original geometry.<P>
-   *
-   * @param ncol new matrix' width (only needed if width changes).
-   * @param bmf
-   * @tparam R
-   * @return
-   */
-  def mapBlock[R : ClassTag](ncol: Int = -1)
-      (bmf: BlockMapFunc[K, R]): DrmLike[R] =
-    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
-
-
-  /**
-   * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
-   *
-   * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
-   *
-   * Row range is currently unsupported except for the all-range. When it will be fully supported,
-   * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
-   *
-   * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
-   * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
-   */
-  def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
-
-
-    val rowSrc: DrmLike[K] = if (rowRange != ::) {
-
-      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
-
-        assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
-        val intKeyed = drm.asInstanceOf[DrmLike[Int]]
-
-        new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
-
-      } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
-
-    } else drm
-
-    if (colRange != ::) {
-
-      assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
-
-      // Use mapBlock operator to do in-core subranging.
-      rowSrc.mapBlock(ncol = colRange.length)({
-        case (keys, block) => keys -> block(::, colRange)
-      })
-
-    } else rowSrc
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index 47cfa26..3801c77 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -20,6 +20,7 @@ package org.apache.mahout.sparkbindings.drm
 import scala.reflect.ClassTag
 import org.apache.spark.SparkContext
 import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings._
 
 /** Encapsulates either DrmRdd[K] or BlockifiedDrmRdd[K] */
 class DrmRddInput[K: ClassTag](