You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/25 04:27:22 UTC

mahout git commit: A small renaming of methods of DrmRddInput

Repository: mahout
Updated Branches:
  refs/heads/flink-binding 9831771b8 -> 9e9ec79d6


A small renaming of methods of DrmRddInput


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9e9ec79d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9e9ec79d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9e9ec79d

Branch: refs/heads/flink-binding
Commit: 9e9ec79d6ceac7928e78be1dcf48c421c851b4bb
Parents: 9831771
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Sat Oct 24 20:26:56 2015 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Sat Oct 24 20:26:56 2015 -0700

----------------------------------------------------------------------
 .../apache/mahout/sparkbindings/SparkEngine.scala |  2 +-
 .../apache/mahout/sparkbindings/blas/ABt.scala    |  8 ++++----
 .../apache/mahout/sparkbindings/blas/AewB.scala   | 12 ++++++------
 .../mahout/sparkbindings/blas/AinCoreB.scala      |  4 ++--
 .../org/apache/mahout/sparkbindings/blas/At.scala |  2 +-
 .../apache/mahout/sparkbindings/blas/AtA.scala    |  4 ++--
 .../apache/mahout/sparkbindings/blas/AtB.scala    | 12 ++++++------
 .../org/apache/mahout/sparkbindings/blas/Ax.scala |  4 ++--
 .../mahout/sparkbindings/blas/CbindAB.scala       |  6 +++---
 .../mahout/sparkbindings/blas/MapBlock.scala      |  2 +-
 .../apache/mahout/sparkbindings/blas/Par.scala    |  8 ++++----
 .../mahout/sparkbindings/blas/RbindAB.scala       |  8 ++++----
 .../mahout/sparkbindings/blas/Slicing.scala       |  2 +-
 .../sparkbindings/drm/CheckpointedDrmSpark.scala  | 18 +++++++++---------
 .../drm/CheckpointedDrmSparkOps.scala             |  2 +-
 .../mahout/sparkbindings/drm/DrmRddInput.scala    |  4 ++--
 16 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 41e966b..fdbd950 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -104,7 +104,7 @@ object SparkEngine extends DistributedEngine {
   BlockReduceFunc): Matrix = {
 
     import drm._
-    drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+    drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 11e2bad..4e77739 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -73,9 +73,9 @@ object ABt {
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     // Blockify everything.
-    val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+    val blocksA = srcA.asBlockified(operator.A.ncol)
 
-    val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+    val blocksB = srcB.asBlockified(operator.B.ncol)
 
     val prodNCol = operator.ncol
     val prodNRow = operator.nrow
@@ -212,7 +212,7 @@ object ABt {
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     // Blockify everything.
-    val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+    val blocksA = srcA.asBlockified(operator.A.ncol)
 
         // Mark row-blocks with group id
         .mapPartitionsWithIndex((part, iter) => {
@@ -232,7 +232,7 @@ object ABt {
       }
     })
 
-    val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+    val blocksB = srcB.asBlockified(operator.B.ncol)
 
     // Final product's geometry. We want to extract that into local variables since we want to use
     // them as closure attributes.

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 8a90398..8e3a19a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -67,8 +67,8 @@ object AewB {
       case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
     }
 
-    val a = srcA.toDrmRdd()
-    val b = srcB.toDrmRdd()
+    val a = srcA.asRowWise()
+    val b = srcB.asRowWise()
 
     debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.")
 
@@ -120,10 +120,10 @@ object AewB {
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
     val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
-      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
-      srcA.toBlockifiedDrmRdd(op.A.ncol)
+      srcA.asBlockified(op.A.ncol)
     }
 
     val rdd = aBlockRdd.map {case (keys, block) =>
@@ -169,10 +169,10 @@ object AewB {
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing 
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
     val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) {
-      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
-      srcA.toBlockifiedDrmRdd(op.A.ncol)
+      srcA.asBlockified(op.A.ncol)
     }
 
     debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 5f9f84a..1894495 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -25,7 +25,7 @@ object AinCoreB {
   }
 
   private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
-    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+    val rddA = srcA.asBlockified(op.A.ncol)
     implicit val ctx:DistributedContext = rddA.context
     val dg = drmBroadcast(op.right.viewDiagonal())
 
@@ -41,7 +41,7 @@ object AinCoreB {
 
   private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+    val rddA = srcA.asBlockified(op.A.ncol)
     implicit val sc:DistributedContext = rddA.sparkContext
 
     debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 5789bd2..fa25b73 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -46,7 +46,7 @@ object At {
 
     debug("operator A'.")
 
-    val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+    val drmRdd = srcA.asBlockified(operator.A.ncol)
     val numPartitions = drmRdd.partitions.size
     val ncol = operator.ncol
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index a212878..50a4b19 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -50,7 +50,7 @@ object AtA {
 
       // If we can comfortably fit upper-triangular operator into a map memory, we will run slim
       // algorithm with upper-triangular accumulators in maps. 
-      val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator)
+      val inCoreA = at_a_slim(srcRdd = srcRdd.asRowWise(), operator = operator)
       val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext)
       drmRdd
 
@@ -58,7 +58,7 @@ object AtA {
 
       // Otherwise, we need to run a distributed, big version
       //      new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
-      at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator)
+      at_a_nongraph_mmul(srcRdd = srcRdd.asBlockified(operator.A.ncol), op = operator)
 
     }
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index 45705a9..f7ad575 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -51,8 +51,8 @@ object AtB {
   def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A],
                                zippable: Boolean = false): DrmRddInput[Int] = {
 
-    val rddA = srcA.toDrmRdd()
-    val rddB = srcB.toDrmRdd()
+    val rddA = srcA.asRowWise()
+    val rddB = srcB.asRowWise()
 
 
     val prodNCol = operator.ncol
@@ -99,8 +99,8 @@ object AtB {
     val prodNRow = safeToNonNegInt(operator.nrow)
     val aNRow = safeToNonNegInt(operator.A.nrow)
 
-    val rddA = srcA.toDrmRdd()
-    val rddB = srcB.toDrmRdd()
+    val rddA = srcA.asRowWise()
+    val rddB = srcB.asRowWise()
 
     // Approximate number of final partitions. We take bigger partitions as our guide to number of
     // elements per partition. TODO: do it better.
@@ -119,8 +119,8 @@ object AtB {
 
       debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.")
 
-      val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
-      val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+      val blockdRddA = srcA.asBlockified(operator.A.ncol)
+      val blockdRddB = srcB.asBlockified(operator.B.ncol)
 
       blockdRddA
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 629accd..9705838 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -15,7 +15,7 @@ object Ax {
 
   def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+    val rddA = srcA.asBlockified(op.A.ncol)
     implicit val sc: DistributedContext = rddA.sparkContext
 
     val bcastX = drmBroadcast(op.x)
@@ -30,7 +30,7 @@ object Ax {
 
   def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
 
-    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+    val rddA = srcA.asBlockified(op.A.ncol)
     implicit val dc:DistributedContext = rddA.sparkContext
 
     val bcastX = drmBroadcast(op.x)

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index 4a379ec..395ba4c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -33,7 +33,7 @@ object CbindAB {
   private val log = Logger.getLogger(CbindAB.getClass)
 
   def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
-    val srcRdd = srcA.toDrmRdd()
+    val srcRdd = srcA.asRowWise()
 
     val ncol = op.A.ncol
     val x = op.x
@@ -62,8 +62,8 @@ object CbindAB {
 
   def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val a = srcA.toDrmRdd()
-    val b = srcB.toDrmRdd()
+    val a = srcA.asRowWise()
+    val b = srcB.asRowWise()
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = n - n1

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 2933ddc..b6bc961 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -31,7 +31,7 @@ object MapBlock {
     // into closure.
     val bmf = operator.bmf
     val ncol = operator.ncol
-    val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+    val rdd = src.asBlockified(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index 0434a72..d31e2f9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -19,7 +19,7 @@ object Par {
 
     val srcBlockified = src.isBlockified
 
-    val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+    val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise()
     val srcNParts = srcRdd.partitions.size
 
     // To what size?
@@ -36,7 +36,7 @@ object Par {
     if (targetParts > srcNParts) {
 
       // Expanding. Always requires deblockified stuff. May require re-shuffling.
-      val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+      val rdd = src.asRowWise().repartition(numPartitions = targetParts)
 
       rdd
 
@@ -44,9 +44,9 @@ object Par {
       // Shrinking.
 
       if (srcBlockified) {
-        drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+        drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts))
       } else {
-        src.toDrmRdd().coalesce(numPartitions = targetParts)
+        src.asRowWise().coalesce(numPartitions = targetParts)
       }
     } else {
       // no adjustment required.

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 62abba6..5fccde2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -31,8 +31,8 @@ object RbindAB {
 
     // If any of the inputs is blockified, use blockified inputs
     if (srcA.isBlockified || srcB.isBlockified) {
-      val a = srcA.toBlockifiedDrmRdd(op.A.ncol)
-      val b = srcB.toBlockifiedDrmRdd(op.B.ncol)
+      val a = srcA.asBlockified(op.A.ncol)
+      val b = srcB.asBlockified(op.B.ncol)
 
       // Union seems to be fine, it is indeed just do partition-level unionization, no shuffles
       a ++ b
@@ -40,8 +40,8 @@ object RbindAB {
     } else {
 
       // Otherwise, use row-wise inputs -- no reason to blockify here.
-      val a = srcA.toDrmRdd()
-      val b = srcB.toDrmRdd()
+      val a = srcA.asRowWise()
+      val b = srcB.asRowWise()
 
       a ++ b
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 0284ba2..a100443 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -8,7 +8,7 @@ object Slicing {
   def rowRange(op: OpRowRange, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
     val rowRange = op.rowRange
     val ncol = op.ncol
-    val rdd = srcA.toDrmRdd()
+    val rdd = srcA.asRowWise()
 
         // Filter the rows in the range only
         .filter({

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 797a5c2..38007e0 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -123,7 +123,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     // since currently spark #collect() requires Serializeable support,
     // we serialize DRM vectors into byte arrays on backend and restore Vector
     // instances on the front end:
-    val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect()
+    val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect()
 
 
     val m = if (data.forall(_._2.isDense))
@@ -159,13 +159,13 @@ class CheckpointedDrmSpark[K: ClassTag](
 
     // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save.
     if (ktag.runtimeClass == classOf[Int]) {
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else if (ktag.runtimeClass == classOf[String]){
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else if (ktag.runtimeClass == classOf[Long]) {
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
 
@@ -176,7 +176,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     val intRowIndex = classTag[K] == classTag[Int]
 
     if (intRowIndex) {
-      val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]]
+      val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]]
 
       // I guess it is a suitable place to compute int keys consistency test here because we know
       // that nrow can be computed lazily, which always happens when rdd is already available, cached,
@@ -189,21 +189,21 @@ class CheckpointedDrmSpark[K: ClassTag](
       intFixExtra = (maxPlus1 - rowCount) max 0L
       maxPlus1
     } else
-      cache().rddInput.toDrmRdd().count()
+      cache().rddInput.asRowWise().count()
   }
 
 
 
   protected def computeNCol = {
     rddInput.isBlockified match {
-      case true ⇒ rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached"))
+      case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
         .map(_._2.ncol).reduce(max(_, _))
-      case false ⇒ cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max(_, _))
+      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
     }
   }
 
   protected def computeNNonZero =
-    cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong
+    cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong
 
   /** Changes the number of rows in the DRM without actually touching the underlying data. Used to
     * redimension a DRM after it has been created, which implies some blank, non-existent rows.

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index abcfc64..25953e1 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
   private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
 
   /** Spark matrix customization exposure */
-  def rdd = sparkDrm.rddInput.toDrmRdd()
+  def rdd = sparkDrm.rddInput.asRowWise()
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index d9dbada..5c9319a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -31,10 +31,10 @@ class DrmRddInput[K: ClassTag](private val input: Either[DrmRdd[K], BlockifiedDr
 
   def isRowWise: Boolean = input.isLeft
 
-  def toDrmRdd(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get))
+  def asRowWise(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get))
 
   /** Use late binding for this. It may or may not be needed, depending on current config. */
-  def toBlockifiedDrmRdd(ncol: ⇒ Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol))
+  def asBlockified(ncol: ⇒ Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol))
 
   def sparkContext: SparkContext = backingRdd.sparkContext