You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/13 04:22:54 UTC

mahout git commit: Merge changes from master

Repository: mahout
Updated Branches:
  refs/heads/flink-binding a168d238d -> 92a2f6c8f


Merge changes from master


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/92a2f6c8
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/92a2f6c8
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/92a2f6c8

Branch: refs/heads/flink-binding
Commit: 92a2f6c8ffa02478e7cc8a4b79ff270bf4d08c8a
Parents: a168d23
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Sat Oct 24 20:26:56 2015 -0700
Committer: smarthi <sm...@apache.org>
Committed: Sat Mar 12 22:20:29 2016 -0500

----------------------------------------------------------------------
 flink/pom.xml                                     |  4 ++--
 .../apache/mahout/sparkbindings/SparkEngine.scala |  2 +-
 .../apache/mahout/sparkbindings/blas/AewB.scala   | 12 ++++++------
 .../mahout/sparkbindings/blas/CbindAB.scala       |  6 +++---
 .../mahout/sparkbindings/blas/MapBlock.scala      |  2 +-
 .../apache/mahout/sparkbindings/blas/Par.scala    |  9 +++++----
 .../sparkbindings/drm/CheckpointedDrmSpark.scala  | 18 +++++++++---------
 .../drm/CheckpointedDrmSparkOps.scala             |  2 +-
 8 files changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 1aafba5..37f1dbf 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -110,7 +110,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala</artifactId>
+      <artifactId>flink-scala_2.10</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
@@ -120,7 +120,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients</artifactId>
+      <artifactId>flink-clients_2.10</artifactId>
       <version>${flink.version}</version>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3200288..b89235d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -97,7 +97,7 @@ object SparkEngine extends DistributedEngine {
   /** Optional engine-specific all reduce tensor operation. */
   override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
   BlockReduceFunc): Matrix = {
-    drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+    drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 309742f..92c429f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -66,8 +66,8 @@ object AewB {
       case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
     }
 
-    val a = srcA.toDrmRdd()
-    val b = srcB.toDrmRdd()
+    val a = srcA.asRowWise()
+    val b = srcB.asRowWise()
 
     debug(s"A${op.op}B: #partsA=${a.partitions.length},#partsB=${b.partitions.length}.")
 
@@ -120,10 +120,10 @@ object AewB {
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
     val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
-      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
-      srcA.toBlockifiedDrmRdd(op.A.ncol)
+      srcA.asBlockified(op.A.ncol)
     }
 
     val rdd = aBlockRdd.map {case (keys, block) =>
@@ -170,10 +170,10 @@ object AewB {
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing 
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
     val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) {
-      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
-      srcA.toBlockifiedDrmRdd(op.A.ncol)
+      srcA.asBlockified(op.A.ncol)
     }
 
     debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.length}.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index f7ba496..9f34b06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -34,7 +34,7 @@ object CbindAB {
   def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
 
     implicit val ktag = op.keyClassTag
-    val srcRdd = srcA.toDrmRdd()
+    val srcRdd = srcA.asRowWise()
 
     val ncol = op.A.ncol
     val x = op.x
@@ -63,8 +63,8 @@ object CbindAB {
 
   def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val a = srcA.toDrmRdd()
-    val b = srcB.toDrmRdd()
+    val a = srcA.asRowWise()
+    val b = srcB.asRowWise()
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = n - n1

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 49de368..1caa537 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -30,7 +30,7 @@ object MapBlock {
     val bmf = operator.bmf
     val ncol = operator.ncol
     implicit val rtag = operator.keyClassTag
-    src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+    src.asBlockified(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index d9d5037..974c8db 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -17,7 +17,7 @@ object Par {
     implicit val ktag = op.keyClassTag
     val srcBlockified = src.isBlockified
 
-    val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+    val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise()
     val srcNParts = srcRdd.partitions.length
 
     // To what size?
@@ -34,16 +34,17 @@ object Par {
     if (targetParts > srcNParts) {
 
       // Expanding. Always requires deblockified stuff. May require re-shuffling.
-      val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+      val rdd = src.asRowWise().repartition(numPartitions = targetParts)
+
       rdd
 
     } else if (targetParts < srcNParts) {
       // Shrinking.
 
       if (srcBlockified) {
-        drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+        drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts))
       } else {
-        src.toDrmRdd().coalesce(numPartitions = targetParts)
+        src.asRowWise().coalesce(numPartitions = targetParts)
       }
     } else {
       // no adjustment required.

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index bd95fe0..71755c5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -125,7 +125,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     // since currently spark #collect() requires Serializeable support,
     // we serialize DRM vectors into byte arrays on backend and restore Vector
     // instances on the front end:
-    val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect()
+    val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect()
 
 
     val m = if (data.forall(_._2.isDense))
@@ -162,13 +162,13 @@ class CheckpointedDrmSpark[K: ClassTag](
 
     // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save.
     if (ktag.runtimeClass == classOf[Int]) {
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else if (ktag.runtimeClass == classOf[String]){
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else if (ktag.runtimeClass == classOf[Long]) {
-      rddInput.toDrmRdd()
+      rddInput.asRowWise()
         .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path)
     } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
 
@@ -179,7 +179,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     val intRowIndex = classTag[K] == classTag[Int]
 
     if (intRowIndex) {
-      val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]]
+      val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]]
 
       // I guess it is a suitable place to compute int keys consistency test here because we know
       // that nrow can be computed lazily, which always happens when rdd is already available, cached,
@@ -192,21 +192,21 @@ class CheckpointedDrmSpark[K: ClassTag](
       intFixExtra = (maxPlus1 - rowCount) max 0L
       maxPlus1
     } else
-      cache().rddInput.toDrmRdd().count()
+      cache().rddInput.asRowWise().count()
   }
 
 
 
   protected def computeNCol = {
     rddInput.isBlockified match {
-      case true ⇒ rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached"))
+      case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
         .map(_._2.ncol).reduce(max)
-      case false ⇒ cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max)
+      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
     }
   }
 
   protected def computeNNonZero =
-    cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong
+    cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong
 
   /** Changes the number of rows in the DRM without actually touching the underlying data. Used to
     * redimension a DRM after it has been created, which implies some blank, non-existent rows.

http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 60dd850..e745a24 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) {
   private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
 
   /** Spark matrix customization exposure */
-  def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd()
+  def rdd = sparkDrm.rddInput.asRowWise()
 
 }