You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/10/25 04:44:16 UTC
[1/2] mahout git commit: 1. Reworked FlinkEngine.drmDfsRead 2. small
renaming of methods in FlinkDrm.scala
Repository: mahout
Updated Branches:
refs/heads/flink-binding 9e9ec79d6 -> 1d9b6322e
1. Reworked FlinkEngine.drmDfsRead
2. small renaming of methods in FlinkDrm.scala
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/41dcb425
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/41dcb425
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/41dcb425
Branch: refs/heads/flink-binding
Commit: 41dcb4253af7fdccee81b2f2acf22e6a904c2b9d
Parents: 9e9ec79
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 24 23:37:37 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 24 23:38:15 2015 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 28 +++++++++++---------
.../mahout/flinkbindings/blas/FlinkOpAewB.scala | 4 +--
.../flinkbindings/blas/FlinkOpAewScalar.scala | 6 ++---
.../mahout/flinkbindings/blas/FlinkOpAt.scala | 2 +-
.../mahout/flinkbindings/blas/FlinkOpAtA.scala | 4 +--
.../mahout/flinkbindings/blas/FlinkOpAtB.scala | 4 +--
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 2 +-
.../flinkbindings/blas/FlinkOpCBind.scala | 8 +++---
.../flinkbindings/blas/FlinkOpMapBlock.scala | 2 +-
.../flinkbindings/blas/FlinkOpRBind.scala | 2 +-
.../flinkbindings/blas/FlinkOpRowRange.scala | 2 +-
.../blas/FlinkOpTimesRightMatrix.scala | 2 +-
.../mahout/flinkbindings/drm/FlinkDrm.scala | 12 ++++-----
.../mahout/flinkbindings/blas/LATestSuite.scala | 22 +++++++--------
.../mahout/sparkbindings/SparkEngine.scala | 3 ---
15 files changed, 52 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index fee3d73..269a928 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -19,6 +19,7 @@
package org.apache.mahout.flinkbindings
import java.util.Collection
+
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -30,7 +31,6 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.blas._
import org.apache.mahout.flinkbindings.drm._
import org.apache.mahout.flinkbindings.io.HDFSUtil
@@ -58,16 +58,20 @@ object FlinkEngine extends DistributedEngine {
*/
override def drmDfsRead(path: String, parMin: Int = 0)
(implicit dc: DistributedContext): CheckpointedDrm[_] = {
+
+ // Require that context is actually Flink context.
+ require(dc.isInstanceOf[FlinkDistributedContext], "Supplied context must be for the Flink backend.")
+
+ // Extract the Flink Environment variable
+ implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
+
val metadata = hdfsUtils.readDrmHeader(path)
val unwrapKey = metadata.unwrapKeyFunction
- val job = new JobConf
- val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
- FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
-
- val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
+ val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],
+ classOf[Writable], classOf[VectorWritable], path)
- val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
+ val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
(unwrapKey(tuple.f0), tuple.f1)
}
@@ -89,7 +93,7 @@ object FlinkEngine extends DistributedEngine {
override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
// Flink-specific Physical Plan translation.
val drm = flinkTranslate(plan)
- val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+ val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
newcp.cache()
}
@@ -101,7 +105,7 @@ object FlinkEngine extends DistributedEngine {
// TODO: create specific implementation of Atx, see MAHOUT-1749
val opAt = OpAt(a)
val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
- val atCast = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+ val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
val opAx = OpAx(atCast, x)
FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
}
@@ -112,11 +116,11 @@ object FlinkEngine extends DistributedEngine {
// TODO: create specific implementation of ABt, see MAHOUT-1750
val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
- val c = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+ val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
- val d = new CheckpointedFlinkDrm(bt.deblockify.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
+ val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
.asInstanceOf[FlinkDrm[K]]
@@ -167,7 +171,7 @@ object FlinkEngine extends DistributedEngine {
override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
val n = drm.ncol
- val result = drm.blockify.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
+ val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
def map(tuple: (Array[K], Matrix)): Vector = {
val (_, block) = tuple
val acc = block(0, ::).like()
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index 460199e..38fe312 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -29,8 +29,8 @@ object FlinkOpAewB {
val classTag = extractRealClassTag(op.A)
val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
- val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
val res: DataSet[(Any, Vector)] =
rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index a97f8c8..ab434bb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -44,7 +44,7 @@ object FlinkOpAewScalar {
def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
val function = EWOpsCloning.strToFunction(op.op)
- val res = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
case (keys, mat) => (keys, function(mat, scalar))
}
@@ -58,7 +58,7 @@ object FlinkOpAewScalar {
val inplace = isInplace
val res = if (op.evalZeros) {
- A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
val (keys, block) = tuple
val newBlock = if (inplace) block else block.cloned
@@ -67,7 +67,7 @@ object FlinkOpAewScalar {
}
})
} else {
- A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
val (keys, block) = tuple
val newBlock = if (inplace) block else block.cloned
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index b859e1f..274b1ca 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -51,7 +51,7 @@ object FlinkOpAt {
def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
- val sparseParts = A.blockify.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
+ val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
case (keys, block) => {
(0 until block.ncol).map(columnIdx => {
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 0bda805..0e30eff 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -45,7 +45,7 @@ object FlinkOpAtA {
}
def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = {
- val ds = A.blockify.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
+ val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] {
// TODO: optimize it: use upper-triangle matrices like in Spark
@@ -62,7 +62,7 @@ object FlinkOpAtA {
def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
val nrow = op.A.nrow
val ncol = op.A.ncol
- val ds = A.blockify.ds
+ val ds = A.asBlockified.ds
val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] {
def map(a: (Array[Any], Matrix)): Int = 1
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index 362c62f..0dd0dd2 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -52,8 +52,8 @@ object FlinkOpAtB {
val classTag = extractRealClassTag(op.A)
val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
- val rowsAt = At.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner)
val ncol = op.ncol
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index b473a4c..503ab17 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -44,7 +44,7 @@ object FlinkOpAx {
val singletonDataSetX = ctx.env.fromElements(op.x)
- val out = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ val out = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
var x: Vector = null
override def open(params: Configuration): Unit = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 234937b..49ca7d5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -50,8 +50,8 @@ object FlinkOpCBind {
val classTag = extractRealClassTag(op.A)
val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
- val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
+ val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
val res: DataSet[(Any, Vector)] =
rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
@@ -102,9 +102,9 @@ object FlinkOpCBind {
def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
val left = op.leftBind
- val ds = A.blockify.ds
+ val ds = A.asBlockified.ds
- val out = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
case (keys, mat) => (keys, cbind(mat, x, left))
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index cd745e4..9530d43 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -34,7 +34,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
object FlinkOpMapBlock {
def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {
- val res = src.blockify.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
+ val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
def map(block: (Array[S], Matrix)): (Array[R], Matrix) = {
val out = function(block)
assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index f8fbea0..9ebff51 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -30,7 +30,7 @@ object FlinkOpRBind {
def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
// note that indexes of B are already re-arranged prior to executing this code
- val res = A.deblockify.ds.union(B.deblockify.ds)
+ val res = A.asRowWise.ds.union(B.asRowWise.ds)
new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index f720125..6e11892 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -35,7 +35,7 @@ object FlinkOpRowRange {
val rowRange = op.rowRange
val firstIdx = rowRange.head
- val filtered = A.deblockify.ds.filter(new FilterFunction[(Int, Vector)] {
+ val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] {
def filter(tuple: (Int, Vector)): Boolean = tuple match {
case (idx, vec) => rowRange.contains(idx)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index 92724d8..af3854d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -39,7 +39,7 @@ object FlinkOpTimesRightMatrix {
val singletonDataSetB = ctx.env.fromElements(inCoreB)
- val res = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+ val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
var inCoreB: Matrix = null
override def open(params: Configuration): Unit = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 27eac4e..d00a335 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -43,8 +43,8 @@ trait FlinkDrm[K] {
def context: FlinkDistributedContext
def isBlockified: Boolean
- def blockify: BlockifiedFlinkDrm[K]
- def deblockify: RowsFlinkDrm[K]
+ def asBlockified: BlockifiedFlinkDrm[K]
+ def asRowWise: RowsFlinkDrm[K]
def classTag: ClassTag[K]
}
@@ -56,7 +56,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
def isBlockified = false
- def blockify(): BlockifiedFlinkDrm[K] = {
+ def asBlockified(): BlockifiedFlinkDrm[K] = {
val ncolLocal = ncol
val classTag = implicitly[ClassTag[K]]
@@ -81,7 +81,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
new BlockifiedFlinkDrm(parts, ncol)
}
- def deblockify = this
+ def asRowWise = this
def classTag = implicitly[ClassTag[K]]
@@ -94,9 +94,9 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol:
def isBlockified = true
- def blockify = this
+ def asBlockified = this
- def deblockify = {
+ def asRowWise = {
val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match {
case (keys, block) => keys.view.zipWithIndex.foreach {
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
index 786ab5f..a766146 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
@@ -41,7 +41,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val opAx = new OpAx(A, x)
val res = FlinkOpAx.blockifiedBroadcastAx(opAx, A)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds)
val output = drm.collect
val b = output(::, 0)
@@ -54,7 +54,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val opAt = new OpAt(A)
val res = FlinkOpAt.sparseTrick(opAt, A)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow)
val output = drm.collect
assert((output - inCoreA.t).norm < 1e-6)
@@ -71,7 +71,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val opAtB = new OpAtB(At, B)
val res = FlinkOpAtB.notZippable(opAtB, At, B)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol)
val output = drm.collect
val expected = inCoreAt.t %*% inCoreB
@@ -86,7 +86,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpAewScalar(A, scalar, "*")
val res = FlinkOpAewScalar.opScalarNoSideEffect(op, A, scalar)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
val output = drm.collect
val expected = inCoreA * scalar
@@ -100,7 +100,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpAewB(A, A, "*")
val res = FlinkOpAewB.rowWiseJoinNoSideEffect(op, A, A)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol)
val output = drm.collect
assert((output - (inCoreA * inCoreA)).norm < 1e-6)
@@ -115,7 +115,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpCbind(A, B)
val res = FlinkOpCBind.cbind(op, A, B)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow,
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
_ncol=(inCoreA.ncol + inCoreB.ncol))
val output = drm.collect
@@ -130,7 +130,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpCbindScalar(A, 1, true)
val res = FlinkOpCBind.cbindScalar(op, A, 1)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow,
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
_ncol=(inCoreA.ncol + 1))
val output = drm.collect
@@ -145,7 +145,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpCbindScalar(A, 1, false)
val res = FlinkOpCBind.cbindScalar(op, A, 1)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow,
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
_ncol=(inCoreA.ncol + 1))
val output = drm.collect
@@ -161,7 +161,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpRowRange(A, range)
val res = FlinkOpRowRange.slice(op, A)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow,
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
_ncol=inCoreA.ncol)
val output = drm.collect
@@ -177,7 +177,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpTimesRightMatrix(A, inCoreB)
val res = FlinkOpTimesRightMatrix.drmTimesInCore(op, A, inCoreB)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow,
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
_ncol=op.ncol)
val output = drm.collect
@@ -204,7 +204,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite {
val op = new OpAtA(Aany)
val res = FlinkOpAtA.fat(op, Aany)
- val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, _ncol=op.ncol)
+ val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=op.ncol)
val output = drm.collect
println(output)
http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index fdbd950..d89a8de 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -26,7 +26,6 @@ import RLikeOps._
import org.apache.mahout.math.drm.logical._
import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
import org.apache.mahout.math._
-import scala.Predef
import scala.reflect.ClassTag
import scala.reflect.classTag
import org.apache.spark.storage.StorageLevel
@@ -35,8 +34,6 @@ import org.apache.hadoop.io._
import collection._
import JavaConversions._
import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.spark.rdd.RDD
import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
[2/2] mahout git commit: Refactored FinkEngine.drmDfsRead(),
closes apache/mahout #165
Posted by sm...@apache.org.
Refactored FinkEngine.drmDfsRead(), closes apache/mahout #165
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/1d9b6322
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/1d9b6322
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/1d9b6322
Branch: refs/heads/flink-binding
Commit: 1d9b6322e4db2cfd0b681e27591fe81206a915f5
Parents: 41dcb42
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 24 23:43:53 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 24 23:43:53 2015 -0400
----------------------------------------------------------------------
.../main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/1d9b6322/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 269a928..9820b86 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -66,6 +66,7 @@ object FlinkEngine extends DistributedEngine {
implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
val metadata = hdfsUtils.readDrmHeader(path)
+
val unwrapKey = metadata.unwrapKeyFunction
val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],