You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/13 04:22:54 UTC
mahout git commit: Merge changes from master
Repository: mahout
Updated Branches:
refs/heads/flink-binding a168d238d -> 92a2f6c8f
Merge changes from master
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/92a2f6c8
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/92a2f6c8
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/92a2f6c8
Branch: refs/heads/flink-binding
Commit: 92a2f6c8ffa02478e7cc8a4b79ff270bf4d08c8a
Parents: a168d23
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Sat Oct 24 20:26:56 2015 -0700
Committer: smarthi <sm...@apache.org>
Committed: Sat Mar 12 22:20:29 2016 -0500
----------------------------------------------------------------------
flink/pom.xml | 4 ++--
.../apache/mahout/sparkbindings/SparkEngine.scala | 2 +-
.../apache/mahout/sparkbindings/blas/AewB.scala | 12 ++++++------
.../mahout/sparkbindings/blas/CbindAB.scala | 6 +++---
.../mahout/sparkbindings/blas/MapBlock.scala | 2 +-
.../apache/mahout/sparkbindings/blas/Par.scala | 9 +++++----
.../sparkbindings/drm/CheckpointedDrmSpark.scala | 18 +++++++++---------
.../drm/CheckpointedDrmSparkOps.scala | 2 +-
8 files changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 1aafba5..37f1dbf 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -110,7 +110,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala</artifactId>
+ <artifactId>flink-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@@ -120,7 +120,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
+ <artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3200288..b89235d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -97,7 +97,7 @@ object SparkEngine extends DistributedEngine {
/** Optional engine-specific all reduce tensor operation. */
override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
BlockReduceFunc): Matrix = {
- drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+ drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf)
}
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 309742f..92c429f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -66,8 +66,8 @@ object AewB {
case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
}
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ val a = srcA.asRowWise()
+ val b = srcB.asRowWise()
debug(s"A${op.op}B: #partsA=${a.partitions.length},#partsB=${b.partitions.length}.")
@@ -120,10 +120,10 @@ object AewB {
// Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
// rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
- val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+ val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
} else {
- srcA.toBlockifiedDrmRdd(op.A.ncol)
+ srcA.asBlockified(op.A.ncol)
}
val rdd = aBlockRdd.map {case (keys, block) =>
@@ -170,10 +170,10 @@ object AewB {
// Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
// rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) {
- val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+ val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
} else {
- srcA.toBlockifiedDrmRdd(op.A.ncol)
+ srcA.asBlockified(op.A.ncol)
}
debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.length}.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index f7ba496..9f34b06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -34,7 +34,7 @@ object CbindAB {
def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
implicit val ktag = op.keyClassTag
- val srcRdd = srcA.toDrmRdd()
+ val srcRdd = srcA.asRowWise()
val ncol = op.A.ncol
val x = op.x
@@ -63,8 +63,8 @@ object CbindAB {
def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
- val a = srcA.toDrmRdd()
- val b = srcB.toDrmRdd()
+ val a = srcA.asRowWise()
+ val b = srcB.asRowWise()
val n = op.ncol
val n1 = op.A.ncol
val n2 = n - n1
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 49de368..1caa537 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -30,7 +30,7 @@ object MapBlock {
val bmf = operator.bmf
val ncol = operator.ncol
implicit val rtag = operator.keyClassTag
- src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+ src.asBlockified(operator.A.ncol).map(blockTuple => {
val out = bmf(blockTuple)
assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index d9d5037..974c8db 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -17,7 +17,7 @@ object Par {
implicit val ktag = op.keyClassTag
val srcBlockified = src.isBlockified
- val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+ val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise()
val srcNParts = srcRdd.partitions.length
// To what size?
@@ -34,16 +34,17 @@ object Par {
if (targetParts > srcNParts) {
// Expanding. Always requires deblockified stuff. May require re-shuffling.
- val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+ val rdd = src.asRowWise().repartition(numPartitions = targetParts)
+
rdd
} else if (targetParts < srcNParts) {
// Shrinking.
if (srcBlockified) {
- drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+ drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts))
} else {
- src.toDrmRdd().coalesce(numPartitions = targetParts)
+ src.asRowWise().coalesce(numPartitions = targetParts)
}
} else {
// no adjustment required.
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index bd95fe0..71755c5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -125,7 +125,7 @@ class CheckpointedDrmSpark[K: ClassTag](
// since currently spark #collect() requires Serializeable support,
// we serialize DRM vectors into byte arrays on backend and restore Vector
// instances on the front end:
- val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect()
+ val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect()
val m = if (data.forall(_._2.isDense))
@@ -162,13 +162,13 @@ class CheckpointedDrmSpark[K: ClassTag](
// Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save.
if (ktag.runtimeClass == classOf[Int]) {
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else if (ktag.runtimeClass == classOf[String]){
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else if (ktag.runtimeClass == classOf[Long]) {
- rddInput.toDrmRdd()
+ rddInput.asRowWise()
.map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path)
} else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
@@ -179,7 +179,7 @@ class CheckpointedDrmSpark[K: ClassTag](
val intRowIndex = classTag[K] == classTag[Int]
if (intRowIndex) {
- val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]]
+ val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]]
// I guess it is a suitable place to compute int keys consistency test here because we know
// that nrow can be computed lazily, which always happens when rdd is already available, cached,
@@ -192,21 +192,21 @@ class CheckpointedDrmSpark[K: ClassTag](
intFixExtra = (maxPlus1 - rowCount) max 0L
maxPlus1
} else
- cache().rddInput.toDrmRdd().count()
+ cache().rddInput.asRowWise().count()
}
protected def computeNCol = {
rddInput.isBlockified match {
- case true ⇒ rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached"))
+ case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
.map(_._2.ncol).reduce(max)
- case false ⇒ cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max)
+ case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
}
}
protected def computeNNonZero =
- cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong
+ cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong
/** Changes the number of rows in the DRM without actually touching the underlying data. Used to
* redimension a DRM after it has been created, which implies some blank, non-existent rows.
http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 60dd850..e745a24 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) {
private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
/** Spark matrix customization exposure */
- def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd()
+ def rdd = sparkDrm.rddInput.asRowWise()
}