You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/25 04:27:22 UTC
mahout git commit: A small renaming of methods of DrmRddInput
Repository: mahout
Updated Branches:
refs/heads/flink-binding 9831771b8 -> 9e9ec79d6
A small renaming of methods of DrmRddInput
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9e9ec79d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9e9ec79d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9e9ec79d
Branch: refs/heads/flink-binding
Commit: 9e9ec79d6ceac7928e78be1dcf48c421c851b4bb
Parents: 9831771
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Sat Oct 24 20:26:56 2015 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Sat Oct 24 20:26:56 2015 -0700
----------------------------------------------------------------------
.../apache/mahout/sparkbindings/SparkEngine.scala | 2 +-
.../apache/mahout/sparkbindings/blas/ABt.scala | 8 ++++----
.../apache/mahout/sparkbindings/blas/AewB.scala | 12 ++++++------
.../mahout/sparkbindings/blas/AinCoreB.scala | 4 ++--
.../org/apache/mahout/sparkbindings/blas/At.scala | 2 +-
.../apache/mahout/sparkbindings/blas/AtA.scala | 4 ++--
.../apache/mahout/sparkbindings/blas/AtB.scala | 12 ++++++------
.../org/apache/mahout/sparkbindings/blas/Ax.scala | 4 ++--
.../mahout/sparkbindings/blas/CbindAB.scala | 6 +++---
.../mahout/sparkbindings/blas/MapBlock.scala | 2 +-
.../apache/mahout/sparkbindings/blas/Par.scala | 8 ++++----
.../mahout/sparkbindings/blas/RbindAB.scala | 8 ++++----
.../mahout/sparkbindings/blas/Slicing.scala | 2 +-
.../sparkbindings/drm/CheckpointedDrmSpark.scala | 18 +++++++++---------
.../drm/CheckpointedDrmSparkOps.scala | 2 +-
.../mahout/sparkbindings/drm/DrmRddInput.scala | 4 ++--
16 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 41e966b..fdbd950 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -104,7 +104,7 @@ object SparkEngine extends DistributedEngine {
BlockReduceFunc): Matrix = {
import drm._
- drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+ drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf)
}
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 11e2bad..4e77739 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -73,9 +73,9 @@ object ABt {
srcB: DrmRddInput[Int]): DrmRddInput[K] = {
// Blockify everything.
- val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+ val blocksA = srcA.asBlockified(operator.A.ncol)
- val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+ val blocksB = srcB.asBlockified(operator.B.ncol)
val prodNCol = operator.ncol
val prodNRow = operator.nrow
@@ -212,7 +212,7 @@ object ABt {
srcB: DrmRddInput[Int]): DrmRddInput[K] = {
// Blockify everything.
- val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+ val blocksA = srcA.asBlockified(operator.A.ncol)
// Mark row-blocks with group id
.mapPartitionsWithIndex((part, iter) => {
@@ -232,7 +232,7 @@ object ABt {
}
})
- val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+ val blocksB = srcB.asBlockified(operator.B.ncol)
// Final product's geometry. We want to extract that into local variables since we want to use
// them as closure attributes.
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 8a90398..8e3a19a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -67,8 +67,8 @@ object AewB {
case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
}
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ val a = srcA.asRowWise()
+ val b = srcB.asRowWise()
debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.")
@@ -120,10 +120,10 @@ object AewB {
// Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
// rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
- val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+ val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
} else {
- srcA.toBlockifiedDrmRdd(op.A.ncol)
+ srcA.asBlockified(op.A.ncol)
}
val rdd = aBlockRdd.map {case (keys, block) =>
@@ -169,10 +169,10 @@ object AewB {
// Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
// rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) {
- val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+ val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
} else {
- srcA.toBlockifiedDrmRdd(op.A.ncol)
+ srcA.asBlockified(op.A.ncol)
}
debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 5f9f84a..1894495 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -25,7 +25,7 @@ object AinCoreB {
}
private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ val rddA = srcA.asBlockified(op.A.ncol)
implicit val ctx:DistributedContext = rddA.context
val dg = drmBroadcast(op.right.viewDiagonal())
@@ -41,7 +41,7 @@ object AinCoreB {
private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ val rddA = srcA.asBlockified(op.A.ncol)
implicit val sc:DistributedContext = rddA.sparkContext
debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 5789bd2..fa25b73 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -46,7 +46,7 @@ object At {
debug("operator A'.")
- val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+ val drmRdd = srcA.asBlockified(operator.A.ncol)
val numPartitions = drmRdd.partitions.size
val ncol = operator.ncol
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index a212878..50a4b19 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -50,7 +50,7 @@ object AtA {
// If we can comfortably fit upper-triangular operator into a map memory, we will run slim
// algorithm with upper-triangular accumulators in maps.
- val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator)
+ val inCoreA = at_a_slim(srcRdd = srcRdd.asRowWise(), operator = operator)
val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext)
drmRdd
@@ -58,7 +58,7 @@ object AtA {
// Otherwise, we need to run a distributed, big version
// new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
- at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator)
+ at_a_nongraph_mmul(srcRdd = srcRdd.asBlockified(operator.A.ncol), op = operator)
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index 45705a9..f7ad575 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -51,8 +51,8 @@ object AtB {
def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A],
zippable: Boolean = false): DrmRddInput[Int] = {
- val rddA = srcA.toDrmRdd()
- val rddB = srcB.toDrmRdd()
+ val rddA = srcA.asRowWise()
+ val rddB = srcB.asRowWise()
val prodNCol = operator.ncol
@@ -99,8 +99,8 @@ object AtB {
val prodNRow = safeToNonNegInt(operator.nrow)
val aNRow = safeToNonNegInt(operator.A.nrow)
- val rddA = srcA.toDrmRdd()
- val rddB = srcB.toDrmRdd()
+ val rddA = srcA.asRowWise()
+ val rddB = srcB.asRowWise()
// Approximate number of final partitions. We take bigger partitions as our guide to number of
// elements per partition. TODO: do it better.
@@ -119,8 +119,8 @@ object AtB {
debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.")
- val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
- val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+ val blockdRddA = srcA.asBlockified(operator.A.ncol)
+ val blockdRddB = srcB.asBlockified(operator.B.ncol)
blockdRddA
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 629accd..9705838 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -15,7 +15,7 @@ object Ax {
def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ val rddA = srcA.asBlockified(op.A.ncol)
implicit val sc: DistributedContext = rddA.sparkContext
val bcastX = drmBroadcast(op.x)
@@ -30,7 +30,7 @@ object Ax {
def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
- val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ val rddA = srcA.asBlockified(op.A.ncol)
implicit val dc:DistributedContext = rddA.sparkContext
val bcastX = drmBroadcast(op.x)
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index 4a379ec..395ba4c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -33,7 +33,7 @@ object CbindAB {
private val log = Logger.getLogger(CbindAB.getClass)
def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
- val srcRdd = srcA.toDrmRdd()
+ val srcRdd = srcA.asRowWise()
val ncol = op.A.ncol
val x = op.x
@@ -62,8 +62,8 @@ object CbindAB {
def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ val a = srcA.asRowWise()
+ val b = srcB.asRowWise()
val n = op.ncol
val n1 = op.A.ncol
val n2 = n - n1
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 2933ddc..b6bc961 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -31,7 +31,7 @@ object MapBlock {
// into closure.
val bmf = operator.bmf
val ncol = operator.ncol
- val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+ val rdd = src.asBlockified(operator.A.ncol).map(blockTuple => {
val out = bmf(blockTuple)
assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index 0434a72..d31e2f9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -19,7 +19,7 @@ object Par {
val srcBlockified = src.isBlockified
- val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+ val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise()
val srcNParts = srcRdd.partitions.size
// To what size?
@@ -36,7 +36,7 @@ object Par {
if (targetParts > srcNParts) {
// Expanding. Always requires deblockified stuff. May require re-shuffling.
- val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+ val rdd = src.asRowWise().repartition(numPartitions = targetParts)
rdd
@@ -44,9 +44,9 @@ object Par {
// Shrinking.
if (srcBlockified) {
- drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+ drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts))
} else {
- src.toDrmRdd().coalesce(numPartitions = targetParts)
+ src.asRowWise().coalesce(numPartitions = targetParts)
}
} else {
// no adjustment required.
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 62abba6..5fccde2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -31,8 +31,8 @@ object RbindAB {
// If any of the inputs is blockified, use blockified inputs
if (srcA.isBlockified || srcB.isBlockified) {
- val a = srcA.toBlockifiedDrmRdd(op.A.ncol)
- val b = srcB.toBlockifiedDrmRdd(op.B.ncol)
+ val a = srcA.asBlockified(op.A.ncol)
+ val b = srcB.asBlockified(op.B.ncol)
// Union seems to be fine, it is indeed just do partition-level unionization, no shuffles
a ++ b
@@ -40,8 +40,8 @@ object RbindAB {
} else {
// Otherwise, use row-wise inputs -- no reason to blockify here.
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ val a = srcA.asRowWise()
+ val b = srcB.asRowWise()
a ++ b
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 0284ba2..a100443 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -8,7 +8,7 @@ object Slicing {
def rowRange(op: OpRowRange, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
val rowRange = op.rowRange
val ncol = op.ncol
- val rdd = srcA.toDrmRdd()
+ val rdd = srcA.asRowWise()
// Filter the rows in the range only
.filter({
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 797a5c2..38007e0 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -123,7 +123,7 @@ class CheckpointedDrmSpark[K: ClassTag](
// since currently spark #collect() requires Serializeable support,
// we serialize DRM vectors into byte arrays on backend and restore Vector
// instances on the front end:
- val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect()
+ val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect()
val m = if (data.forall(_._2.isDense))
@@ -159,13 +159,13 @@ class CheckpointedDrmSpark[K: ClassTag](
// Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save.
if (ktag.runtimeClass == classOf[Int]) {
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else if (ktag.runtimeClass == classOf[String]){
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else if (ktag.runtimeClass == classOf[Long]) {
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
@@ -176,7 +176,7 @@ class CheckpointedDrmSpark[K: ClassTag](
val intRowIndex = classTag[K] == classTag[Int]
if (intRowIndex) {
- val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]]
+ val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]]
// I guess it is a suitable place to compute int keys consistency test here because we know
// that nrow can be computed lazily, which always happens when rdd is already available, cached,
@@ -189,21 +189,21 @@ class CheckpointedDrmSpark[K: ClassTag](
intFixExtra = (maxPlus1 - rowCount) max 0L
maxPlus1
} else
- cache().rddInput.toDrmRdd().count()
+ cache().rddInput.asRowWise().count()
}
protected def computeNCol = {
rddInput.isBlockified match {
- case true ⇒ rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached"))
+ case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
.map(_._2.ncol).reduce(max(_, _))
- case false ⇒ cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max(_, _))
+ case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
}
}
protected def computeNNonZero =
- cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong
+ cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong
/** Changes the number of rows in the DRM without actually touching the underlying data. Used to
* redimension a DRM after it has been created, which implies some blank, non-existent rows.
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index abcfc64..25953e1 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
/** Spark matrix customization exposure */
- def rdd = sparkDrm.rddInput.toDrmRdd()
+ def rdd = sparkDrm.rddInput.asRowWise()
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index d9dbada..5c9319a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -31,10 +31,10 @@ class DrmRddInput[K: ClassTag](private val input: Either[DrmRdd[K], BlockifiedDr
def isRowWise: Boolean = input.isLeft
- def toDrmRdd(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get))
+ def asRowWise(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get))
/** Use late binding for this. It may or may not be needed, depending on current config. */
- def toBlockifiedDrmRdd(ncol: ⇒ Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol))
+ def asBlockified(ncol: ⇒ Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol))
def sparkContext: SparkContext = backingRdd.sparkContext