You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/10/25 04:44:16 UTC

[1/2] mahout git commit: 1. Reworked FlinkEngine.drmDfsRead 2. small renaming of methods in FlinkDrm.scala

Repository: mahout
Updated Branches:
  refs/heads/flink-binding 9e9ec79d6 -> 1d9b6322e


1. Reworked FlinkEngine.drmDfsRead
2. small renaming of methods in FlinkDrm.scala


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/41dcb425
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/41dcb425
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/41dcb425

Branch: refs/heads/flink-binding
Commit: 41dcb4253af7fdccee81b2f2acf22e6a904c2b9d
Parents: 9e9ec79
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 24 23:37:37 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 24 23:38:15 2015 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 28 +++++++++++---------
 .../mahout/flinkbindings/blas/FlinkOpAewB.scala |  4 +--
 .../flinkbindings/blas/FlinkOpAewScalar.scala   |  6 ++---
 .../mahout/flinkbindings/blas/FlinkOpAt.scala   |  2 +-
 .../mahout/flinkbindings/blas/FlinkOpAtA.scala  |  4 +--
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  |  4 +--
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   |  2 +-
 .../flinkbindings/blas/FlinkOpCBind.scala       |  8 +++---
 .../flinkbindings/blas/FlinkOpMapBlock.scala    |  2 +-
 .../flinkbindings/blas/FlinkOpRBind.scala       |  2 +-
 .../flinkbindings/blas/FlinkOpRowRange.scala    |  2 +-
 .../blas/FlinkOpTimesRightMatrix.scala          |  2 +-
 .../mahout/flinkbindings/drm/FlinkDrm.scala     | 12 ++++-----
 .../mahout/flinkbindings/blas/LATestSuite.scala | 22 +++++++--------
 .../mahout/sparkbindings/SparkEngine.scala      |  3 ---
 15 files changed, 52 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index fee3d73..269a928 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -19,6 +19,7 @@
 package org.apache.mahout.flinkbindings
 
 import java.util.Collection
+
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -30,7 +31,6 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.blas._
 import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.flinkbindings.io.HDFSUtil
@@ -58,16 +58,20 @@ object FlinkEngine extends DistributedEngine {
    */
   override def drmDfsRead(path: String, parMin: Int = 0)
                          (implicit dc: DistributedContext): CheckpointedDrm[_] = {
+
+    // Require that context is actually Flink context.
+    require(dc.isInstanceOf[FlinkDistributedContext], "Supplied context must be for the Flink backend.")
+
+    // Extract the Flink Environment variable
+    implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
+
     val metadata = hdfsUtils.readDrmHeader(path)
     val unwrapKey = metadata.unwrapKeyFunction
 
-    val job = new JobConf
-    val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
-    FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
-
-    val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
+    val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],
+      classOf[Writable], classOf[VectorWritable], path)
 
-    val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
+    val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
       def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
         (unwrapKey(tuple.f0), tuple.f1)
       }
@@ -89,7 +93,7 @@ object FlinkEngine extends DistributedEngine {
   override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
     // Flink-specific Physical Plan translation.
     val drm = flinkTranslate(plan)
-    val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+    val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
     newcp.cache()
   }
 
@@ -101,7 +105,7 @@ object FlinkEngine extends DistributedEngine {
       // TODO: create specific implementation of Atx, see MAHOUT-1749 
       val opAt = OpAt(a)
       val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
-      val atCast = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+      val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
       val opAx = OpAx(atCast, x)
       FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
     }
@@ -112,11 +116,11 @@ object FlinkEngine extends DistributedEngine {
       // TODO: create specific implementation of ABt, see MAHOUT-1750 
       val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
       val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
-      val c = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+      val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
 
       val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
       val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
-      val d = new CheckpointedFlinkDrm(bt.deblockify.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
+      val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
 
       FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
                 .asInstanceOf[FlinkDrm[K]]
@@ -167,7 +171,7 @@ object FlinkEngine extends DistributedEngine {
   override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
-    val result = drm.blockify.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
+    val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
       def map(tuple: (Array[K], Matrix)): Vector = {
         val (_, block) = tuple
         val acc = block(0, ::).like()

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index 460199e..38fe312 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -29,8 +29,8 @@ object FlinkOpAewB {
     val classTag = extractRealClassTag(op.A)
     val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
 
-    val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
 
     val res: DataSet[(Any, Vector)] = 
       rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index a97f8c8..ab434bb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -44,7 +44,7 @@ object FlinkOpAewScalar {
   def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
     val function = EWOpsCloning.strToFunction(op.op)
 
-    val res = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+    val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
         case (keys, mat) => (keys, function(mat, scalar))
       }
@@ -58,7 +58,7 @@ object FlinkOpAewScalar {
     val inplace = isInplace
 
     val res = if (op.evalZeros) {
-      A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
         def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned
@@ -67,7 +67,7 @@ object FlinkOpAewScalar {
         }
       })
     } else {
-      A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
         def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index b859e1f..274b1ca 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -51,7 +51,7 @@ object FlinkOpAt {
   def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
     val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
 
-    val sparseParts = A.blockify.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
+    val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
       def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
         case (keys, block) => {
           (0 until block.ncol).map(columnIdx => {

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 0bda805..0e30eff 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -45,7 +45,7 @@ object FlinkOpAtA {
   }
 
   def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = {
-    val ds = A.blockify.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
+    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
 
     val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] {
       // TODO: optimize it: use upper-triangle matrices like in Spark
@@ -62,7 +62,7 @@ object FlinkOpAtA {
   def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
     val nrow = op.A.nrow
     val ncol = op.A.ncol
-    val ds = A.blockify.ds
+    val ds = A.asBlockified.ds
 
     val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] {
       def map(a: (Array[Any], Matrix)): Int = 1

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index 362c62f..0dd0dd2 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -52,8 +52,8 @@ object FlinkOpAtB {
     val classTag = extractRealClassTag(op.A)
     val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
 
-    val rowsAt = At.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
     val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner)
 
     val ncol = op.ncol

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index b473a4c..503ab17 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -44,7 +44,7 @@ object FlinkOpAx {
 
     val singletonDataSetX = ctx.env.fromElements(op.x)
 
-    val out = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+    val out = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       var x: Vector = null
 
       override def open(params: Configuration): Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 234937b..49ca7d5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -50,8 +50,8 @@ object FlinkOpCBind {
     val classTag = extractRealClassTag(op.A)
     val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
 
-    val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
 
     val res: DataSet[(Any, Vector)] = 
       rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
@@ -102,9 +102,9 @@ object FlinkOpCBind {
 
   def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
     val left = op.leftBind
-    val ds = A.blockify.ds
+    val ds = A.asBlockified.ds
 
-    val out = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+    val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
         case (keys, mat) => (keys, cbind(mat, x, left))
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index cd745e4..9530d43 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -34,7 +34,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
 object FlinkOpMapBlock {
 
   def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {
-    val res = src.blockify.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
+    val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
       def map(block: (Array[S], Matrix)): (Array[R], Matrix) =  {
         val out = function(block)
         assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index f8fbea0..9ebff51 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -30,7 +30,7 @@ object FlinkOpRBind {
 
   def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     // note that indexes of B are already re-arranged prior to executing this code
-    val res = A.deblockify.ds.union(B.deblockify.ds)
+    val res = A.asRowWise.ds.union(B.asRowWise.ds)
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index f720125..6e11892 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -35,7 +35,7 @@ object FlinkOpRowRange {
     val rowRange = op.rowRange
     val firstIdx = rowRange.head
 
-    val filtered = A.deblockify.ds.filter(new FilterFunction[(Int, Vector)] {
+    val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] {
       def filter(tuple: (Int, Vector)): Boolean = tuple match {
         case (idx, vec) => rowRange.contains(idx)
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index 92724d8..af3854d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -39,7 +39,7 @@ object FlinkOpTimesRightMatrix {
 
     val singletonDataSetB = ctx.env.fromElements(inCoreB)
 
-    val res = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+    val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       var inCoreB: Matrix = null
 
       override def open(params: Configuration): Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 27eac4e..d00a335 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -43,8 +43,8 @@ trait FlinkDrm[K] {
   def context: FlinkDistributedContext
   def isBlockified: Boolean
 
-  def blockify: BlockifiedFlinkDrm[K]
-  def deblockify: RowsFlinkDrm[K]
+  def asBlockified: BlockifiedFlinkDrm[K]
+  def asRowWise: RowsFlinkDrm[K]
 
   def classTag: ClassTag[K]
 }
@@ -56,7 +56,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
 
   def isBlockified = false
 
-  def blockify(): BlockifiedFlinkDrm[K] = {
+  def asBlockified(): BlockifiedFlinkDrm[K] = {
     val ncolLocal = ncol
     val classTag = implicitly[ClassTag[K]]
 
@@ -81,7 +81,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
     new BlockifiedFlinkDrm(parts, ncol)
   }
 
-  def deblockify = this
+  def asRowWise = this
 
   def classTag = implicitly[ClassTag[K]]
 
@@ -94,9 +94,9 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol:
 
   def isBlockified = true
 
-  def blockify = this
+  def asBlockified = this
 
-  def deblockify = {
+  def asRowWise = {
     val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
       def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match {
         case (keys, block) => keys.view.zipWithIndex.foreach {

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
index 786ab5f..a766146 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
@@ -41,7 +41,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
 
     val opAx = new OpAx(A, x)
     val res = FlinkOpAx.blockifiedBroadcastAx(opAx, A)
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds)
     val output = drm.collect
 
     val b = output(::, 0)
@@ -54,7 +54,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
 
     val opAt = new OpAt(A)
     val res = FlinkOpAt.sparseTrick(opAt, A)
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow)
     val output = drm.collect
 
     assert((output - inCoreA.t).norm < 1e-6)
@@ -71,7 +71,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val opAtB = new OpAtB(At, B)
     val res = FlinkOpAtB.notZippable(opAtB, At, B)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol)
     val output = drm.collect
 
     val expected = inCoreAt.t %*% inCoreB
@@ -86,7 +86,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpAewScalar(A, scalar, "*") 
     val res = FlinkOpAewScalar.opScalarNoSideEffect(op, A, scalar)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
     val output = drm.collect
 
     val expected = inCoreA  * scalar
@@ -100,7 +100,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpAewB(A, A, "*")
     val res = FlinkOpAewB.rowWiseJoinNoSideEffect(op, A, A)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
     val output = drm.collect
 
     assert((output - (inCoreA  * inCoreA)).norm < 1e-6)
@@ -115,7 +115,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpCbind(A, B)
     val res = FlinkOpCBind.cbind(op, A, B)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, 
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
         _ncol=(inCoreA.ncol + inCoreB.ncol))
     val output = drm.collect
 
@@ -130,7 +130,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpCbindScalar(A, 1, true)
     val res = FlinkOpCBind.cbindScalar(op, A, 1)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, 
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
         _ncol=(inCoreA.ncol + 1))
     val output = drm.collect
 
@@ -145,7 +145,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpCbindScalar(A, 1, false)
     val res = FlinkOpCBind.cbindScalar(op, A, 1)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, 
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
         _ncol=(inCoreA.ncol + 1))
     val output = drm.collect
 
@@ -161,7 +161,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpRowRange(A, range)
     val res = FlinkOpRowRange.slice(op, A)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, 
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
         _ncol=inCoreA.ncol)
     val output = drm.collect
 
@@ -177,7 +177,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpTimesRightMatrix(A, inCoreB)
     val res = FlinkOpTimesRightMatrix.drmTimesInCore(op, A, inCoreB)
 
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, 
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
         _ncol=op.ncol)
     val output = drm.collect
 
@@ -204,7 +204,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
     val op = new OpAtA(Aany)
 
     val res = FlinkOpAtA.fat(op, Aany)
-    val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, _ncol=op.ncol)
+    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=op.ncol)
     val output = drm.collect
     println(output)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index fdbd950..d89a8de 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -26,7 +26,6 @@ import RLikeOps._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
-import scala.Predef
 import scala.reflect.ClassTag
 import scala.reflect.classTag
 import org.apache.spark.storage.StorageLevel
@@ -35,8 +34,6 @@ import org.apache.hadoop.io._
 import collection._
 import JavaConversions._
 import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.spark.rdd.RDD
 import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
 
 


[2/2] mahout git commit: Refactored FinkEngine.drmDfsRead(), closes apache/mahout #165

Posted by sm...@apache.org.
Refactored FinkEngine.drmDfsRead(), closes apache/mahout #165


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/1d9b6322
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/1d9b6322
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/1d9b6322

Branch: refs/heads/flink-binding
Commit: 1d9b6322e4db2cfd0b681e27591fe81206a915f5
Parents: 41dcb42
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 24 23:43:53 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 24 23:43:53 2015 -0400

----------------------------------------------------------------------
 .../main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/1d9b6322/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 269a928..9820b86 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -66,6 +66,7 @@ object FlinkEngine extends DistributedEngine {
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
     val metadata = hdfsUtils.readDrmHeader(path)
+
     val unwrapKey = metadata.unwrapKeyFunction
 
     val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],