You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/11/12 07:15:49 UTC
[1/2] mahout git commit: WIP,
migrating to Flink 0.10 and the Flink Scala API
Repository: mahout
Updated Branches:
refs/heads/flink-binding 54f51de82 -> af015ece7
WIP, migrating to Flink 0.10 and the Flink Scala API
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e2ab67f2
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e2ab67f2
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e2ab67f2
Branch: refs/heads/flink-binding
Commit: e2ab67f2629a6c49c4d8e911274b409c2b57101c
Parents: 54f51de
Author: smarthi <sm...@apache.org>
Authored: Tue Nov 10 19:51:20 2015 -0500
Committer: smarthi <sm...@apache.org>
Committed: Tue Nov 10 19:51:20 2015 -0500
----------------------------------------------------------------------
.../mahout/flinkbindings/DataSetOps.scala | 89 +++++++++-----------
.../mahout/flinkbindings/FlinkByteBCast.scala | 4 +-
.../flinkbindings/FlinkDistributedContext.scala | 2 +-
.../mahout/flinkbindings/FlinkEngine.scala | 32 +++----
.../mahout/flinkbindings/blas/FlinkOpAewB.scala | 8 +-
.../flinkbindings/blas/FlinkOpAewScalar.scala | 16 ++--
.../mahout/flinkbindings/blas/FlinkOpAt.scala | 10 +--
.../mahout/flinkbindings/blas/FlinkOpAtA.scala | 6 +-
.../mahout/flinkbindings/blas/FlinkOpAtB.scala | 10 +--
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 15 ++--
.../flinkbindings/blas/FlinkOpCBind.scala | 11 +--
.../flinkbindings/blas/FlinkOpRBind.scala | 2 +-
.../blas/FlinkOpTimesRightMatrix.scala | 4 +-
.../drm/CheckpointedFlinkDrm.scala | 45 ++++------
.../mahout/flinkbindings/drm/FlinkDrm.scala | 24 ++----
.../apache/mahout/flinkbindings/package.scala | 13 +--
.../flinkbindings/DistributedFlinkSuite.scala | 3 +-
.../flinkbindings/examples/ReadCsvExample.scala | 2 +-
18 files changed, 129 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
index 4f437ae..2387d4b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
@@ -18,61 +18,50 @@
*/
package org.apache.mahout.flinkbindings
-import java.lang.Iterable
-import java.util.Collections
-import java.util.Comparator
-import scala.collection.JavaConverters._
-import org.apache.flink.util.Collector
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.configuration.Configuration
-import scala.reflect.ClassTag
-
-
-class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
+//@Deprecated
+//class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
/**
* Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
*
* TODO: remove when FLINK-2152 is committed and released
*/
- def zipWithIndex(): DataSet[(Int, K)] = {
-
- // first for each partition count the number of elements - to calculate the offsets
- val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] {
- override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit = {
- val cnt: Int = values.asScala.count(_ => true)
- val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
- out.collect((subtaskIdx, cnt))
- }
- })
+// def zipWithIndex(): DataSet[(Int, K)] = {
+//
+// first for each partition count the number of elements - to calculate the offsets
+// val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] {
+// override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit = {
+// val cnt: Int = values.asScala.count(_ => true)
+// val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
+// out.collect((subtaskIdx, cnt))
+// }
+// })
// then use the offsets to index items of each partition
- val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] {
- var offset: Int = 0
-
- override def open(parameters: Configuration): Unit = {
- val offsetsJava: java.util.List[(Int, Int)] =
- getRuntimeContext.getBroadcastVariable("counts")
- val offsets = offsetsJava.asScala
-
- val sortedOffsets =
- offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
-
- val subtaskId = getRuntimeContext.getIndexOfThisSubtask
- offset = sortedOffsets.take(subtaskId).sum.toInt
- }
-
- override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit = {
- val it = values.asScala
- it.zipWithIndex.foreach { case (value, idx) =>
- out.collect((idx + offset, value))
- }
- }
- }).withBroadcastSet(counts, "counts");
-
- zipped
- }
-
-}
\ No newline at end of file
+// val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] {
+// var offset: Int = 0
+//
+// override def open(parameters: Configuration): Unit = {
+// val offsetsJava: java.util.List[(Int, Int)] =
+// getRuntimeContext.getBroadcastVariable("counts")
+// val offsets = offsetsJava.asScala
+//
+// val sortedOffsets =
+// offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
+//
+// val subtaskId = getRuntimeContext.getIndexOfThisSubtask
+// offset = sortedOffsets.take(subtaskId).sum
+// }
+//
+// override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit = {
+// val it = values.asScala
+// it.zipWithIndex.foreach { case (value, idx) =>
+// out.collect((idx + offset, value))
+// }
+// }
+// }).withBroadcastSet(counts, "counts")
+//
+// zipped
+// }
+//
+//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 1024452..8544db0 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -72,7 +72,7 @@ object FlinkByteBCast {
dataOutput.writeInt(StreamTypeVector)
writeable.write(dataOutput)
val array = dataOutput.toByteArray()
- return new FlinkByteBCast[Vector](array)
+ new FlinkByteBCast[Vector](array)
}
def wrap(m: Matrix): FlinkByteBCast[Matrix] = {
@@ -81,7 +81,7 @@ object FlinkByteBCast {
dataOutput.writeInt(StreamTypeMatrix)
writeable.write(dataOutput)
val array = dataOutput.toByteArray()
- return new FlinkByteBCast[Matrix](array)
+ new FlinkByteBCast[Matrix](array)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index ebe473f..c818030 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -18,7 +18,7 @@
*/
package org.apache.mahout.flinkbindings
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.mahout.math.drm.DistributedContext
import org.apache.mahout.math.drm.DistributedEngine
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 6b12d11..d03aef7 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -18,17 +18,12 @@
*/
package org.apache.mahout.flinkbindings
-import java.util.Collection
-
-import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.mahout.flinkbindings.blas._
import org.apache.mahout.flinkbindings.drm._
import org.apache.mahout.flinkbindings.io.HDFSUtil
@@ -42,6 +37,8 @@ import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.flink.api.scala._
+
object FlinkEngine extends DistributedEngine {
@@ -65,13 +62,13 @@ object FlinkEngine extends DistributedEngine {
val metadata = hdfsUtils.readDrmHeader(path)
- val unwrapKey = metadata.unwrapKeyFunction
+ val unwrapKey = metadata.unwrapKeyFunction
- val dataset = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
+ val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
- val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
- def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
- (unwrapKey(tuple.f0), tuple.f1)
+ val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] {
+ def map(tuple: (Writable, VectorWritable)): (Any, Vector) = {
+ (unwrapKey(tuple._1), tuple._2)
}
})
@@ -159,7 +156,7 @@ object FlinkEngine extends DistributedEngine {
def reduce(v1: Vector, v2: Vector) = v1 + v2
})
- val list = sum.collect.asScala.toList
+ val list = sum.collect
list.head
}
@@ -180,7 +177,7 @@ object FlinkEngine extends DistributedEngine {
def reduce(v1: Vector, v2: Vector) = v1 + v2
})
- val list = result.collect.asScala.toList
+ val list = result.collect
list.head
}
@@ -203,7 +200,7 @@ object FlinkEngine extends DistributedEngine {
def reduce(v1: Double, v2: Double) = v1 + v2
})
- val list = sumOfSquares.collect.asScala.toList
+ val list = sumOfSquares.collect
list.head
}
@@ -229,10 +226,8 @@ object FlinkEngine extends DistributedEngine {
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
- val rowsJava: Collection[DrmTuple[Int]] = rows.asJava
-
val dataSetType = TypeExtractor.getForObject(rows.head)
- dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
+ dc.env.fromCollection(rows).setParallelism(parallelismDegree)
}
/** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
@@ -251,10 +246,7 @@ object FlinkEngine extends DistributedEngine {
for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
}
-
- val dataSetType = TypeExtractor.getForObject(nonParallelResult.head)
- val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType)
-
+ val result = dc.env.fromCollection(nonParallelResult)
new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index 38fe312..f879e86 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -6,7 +6,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm.FlinkDrm
@@ -40,13 +40,13 @@ object FlinkOpAewB {
val it1 = Lists.newArrayList(it1java).asScala
val it2 = Lists.newArrayList(it2java).asScala
- if (!it1.isEmpty && !it2.isEmpty) {
+ if (it1.nonEmpty && it2.nonEmpty) {
val (idx, a) = it1.head
val (_, b) = it2.head
out.collect((idx, function(a, b)))
- } else if (it1.isEmpty && !it2.isEmpty) {
+ } else if (it1.isEmpty && it2.nonEmpty) {
out.collect(it2.head)
- } else if (!it1.isEmpty && it2.isEmpty) {
+ } else if (it1.nonEmpty && it2.isEmpty) {
out.collect(it1.head)
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index ab434bb..67d710b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -18,18 +18,16 @@
*/
package org.apache.mahout.flinkbindings.blas
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc}
import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.drm.logical.AbstractUnaryOp
-import org.apache.mahout.math.drm.logical.TEwFunc
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala._
/**
* Implementation is inspired by Spark-binding's OpAewScalar
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index 274b1ca..e515b34 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.shaded.com.google.common.collect.Lists
import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
import org.apache.mahout.math.Matrix
@@ -37,6 +36,8 @@ import org.apache.mahout.math.drm.DrmTuple
import org.apache.mahout.math.drm.logical.OpAt
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.flink.api.scala._
+
/**
* Implementation is taken from Spark's At
* https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -53,7 +54,7 @@ object FlinkOpAt {
val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
- case (keys, block) => {
+ case (keys, block) =>
(0 until block.ncol).map(columnIdx => {
val columnVector: Vector = new SequentialAccessSparseVector(ncol)
@@ -61,9 +62,8 @@ object FlinkOpAt {
columnVector(key) = block(idx, columnIdx)
}
- out.collect(new Tuple2(columnIdx, columnVector))
+ out.collect((columnIdx, columnVector))
})
- }
}
})
@@ -73,7 +73,7 @@ object FlinkOpAt {
def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = {
val it = Lists.newArrayList(values).asScala
val (idx, _) = it.head
- val vector = it map { case (idx, vec) => vec } reduce (_ + _)
+ val vector = (it map { case (idx, vec) => vec }).sum
out.collect((idx, vector))
}
})
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 0e30eff..629857a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -5,7 +5,7 @@ import java.lang.Iterable
import scala.collection.JavaConverters._
import org.apache.flink.api.common.functions._
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.com.google.common.collect.Lists
import org.apache.flink.util.Collector
@@ -56,7 +56,7 @@ object FlinkOpAtA {
def reduce(m1: Matrix, m2: Matrix) = m1 + m2
}).collect()
- res.asScala.head
+ res.head
}
def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
@@ -155,7 +155,7 @@ object FlinkOpAtA {
val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack))
// And then we connect the ranges using gaps between offsets:
- val ranges = offsets.sliding(2).map { offs => (offs(0) until offs(1)) }
+ val ranges = offsets.sliding(2).map { offs => offs(0) until offs(1) }
ranges.toArray
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index 0dd0dd2..b514868 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -25,9 +25,7 @@ import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
@@ -40,7 +38,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
import com.google.common.collect.Lists
-
+import org.apache.flink.api.scala._
/**
* Implementation is taken from Spark's AtB
@@ -65,8 +63,8 @@ object FlinkOpAtB {
joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] {
def flatMap(in: Tuple2[(_, Vector), (_, Vector)],
out: Collector[(Int, Matrix)]): Unit = {
- val avec = in.f0._2
- val bvec = in.f1._2
+ val avec = in._1._2
+ val bvec = in._1._2
0.until(blockCount) map { blockKey =>
val blockStart = blockKey * blockHeight
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 503ab17..4302457 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -20,17 +20,17 @@ package org.apache.mahout.flinkbindings.blas
import java.util.List
-import scala.reflect.ClassTag
-
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
import org.apache.mahout.math.drm.logical.OpAx
import org.apache.mahout.math.scalabindings.RLikeOps._
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala._
+
/**
* Implementation is taken from Spark's Ax
@@ -40,7 +40,6 @@ object FlinkOpAx {
def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
implicit val ctx = A.context
- // val x = drmBroadcast(op.x)
val singletonDataSetX = ctx.env.fromElements(op.x)
@@ -48,7 +47,7 @@ object FlinkOpAx {
var x: Vector = null
override def open(params: Configuration): Unit = {
- val runtime = this.getRuntimeContext()
+ val runtime = this.getRuntimeContext
val dsX: List[Vector] = runtime.getBroadcastVariable("vector")
x = dsX.get(0)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 49ca7d5..6cf5e5c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm._
@@ -35,6 +35,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
import com.google.common.collect.Lists
import org.apache.mahout.flinkbindings.DrmDataSet
+import org.apache.mahout.math.scalabindings._
/**
* Implementation is taken from Spark's cbind
@@ -61,11 +62,11 @@ object FlinkOpCBind {
val it1 = Lists.newArrayList(it1java).asScala
val it2 = Lists.newArrayList(it2java).asScala
- if (!it1.isEmpty && !it2.isEmpty) {
+ if (it1.nonEmpty && it2.nonEmpty) {
val (idx, a) = it1.head
val (_, b) = it2.head
- val result: Vector = if (a.isDense && b.isDense) {
+ val result: Vector = if (a.isDense && b.isDense) {
new DenseVector(n)
} else {
new SequentialAccessSparseVector(n)
@@ -75,7 +76,7 @@ object FlinkOpCBind {
result(n1 until n) := b
out.collect((idx, result))
- } else if (it1.isEmpty && !it2.isEmpty) {
+ } else if (it1.isEmpty && it2.nonEmpty) {
val (idx, b) = it2.head
val result: Vector = if (b.isDense) {
new DenseVector(n)
@@ -84,7 +85,7 @@ object FlinkOpCBind {
}
result(n1 until n) := b
out.collect((idx, result))
- } else if (!it1.isEmpty && it2.isEmpty) {
+ } else if (it1.nonEmpty && it2.isEmpty) {
val (idx, a) = it1.head
val result: Vector = if (a.isDense) {
new DenseVector(n)
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 9ebff51..83beaa1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -20,7 +20,7 @@ package org.apache.mahout.flinkbindings.blas
import scala.reflect.ClassTag
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
import org.apache.mahout.math.Vector
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index af3854d..989fad1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -28,6 +28,8 @@ import org.apache.mahout.math.Matrix
import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.flink.api.scala._
+
/**
* Implementation is taken from Spark's OpTimesRightMatrix:
* https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -43,7 +45,7 @@ object FlinkOpTimesRightMatrix {
var inCoreB: Matrix = null
override def open(params: Configuration): Unit = {
- val runtime = this.getRuntimeContext()
+ val runtime = this.getRuntimeContext
val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix")
inCoreB = dsB.get(0)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index b6e6211..96d57d2 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,34 +18,21 @@
*/
package org.apache.mahout.flinkbindings.drm
-import scala.collection.JavaConverters._
-import scala.util.Random
-import scala.reflect.{ClassTag, classTag}
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.DrmDataSet
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SparseMatrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.scalabindings._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
+import org.apache.mahout.flinkbindings.{DrmDataSet, _}
+import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable}
+import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _}
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+
+import scala.collection.JavaConverters._
+import scala.reflect.{ClassTag, classTag}
+import scala.util.Random
class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
@@ -71,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
}
})
- val list = res.collect().asScala.toList
+ val list = res.collect()
list.head
}
@@ -95,7 +82,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
def collect: Matrix = {
- val data = ds.collect().asScala.toList
+ val data = ds.collect()
val isDense = data.forall(_._2.isDense)
val cols = ncol
@@ -139,7 +126,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
val keyTag = implicitly[ClassTag[K]]
val convertKey = keyToWritableFunc(keyTag)
- val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] {
+ val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] {
def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index dbc6b11..c9c1b2c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -20,23 +20,17 @@ package org.apache.mahout.flinkbindings.drm
import java.lang.Iterable
-import scala.collection.JavaConverters.iterableAsScalaIterableConverter
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.MapPartitionFunction
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction}
+import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet
-import org.apache.mahout.flinkbindings.DrmDataSet
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.wrapContext
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SparseRowMatrix
+import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext}
import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
+
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+import scala.reflect.ClassTag
trait FlinkDrm[K] {
def executionEnvironment: ExecutionEnvironment
@@ -100,7 +94,7 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol:
def asRowWise = {
val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
- def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match {
+ def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match {
case (keys, block) => keys.view.zipWithIndex.foreach {
case (key, idx) =>
out.collect((key, block(idx, ::)))
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 656b8de..6b8f2ae 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -19,18 +19,21 @@
package org.apache.mahout
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm}
import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _}
import org.slf4j.LoggerFactory
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
+
import scala.Array._
import scala.reflect.ClassTag
package object flinkbindings {
- private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings")
+ private[flinkbindings] val log = LoggerFactory.getLogger("org.apache.mahout.flinkbindings")
/** Row-wise organized DRM dataset type */
type DrmDataSet[K] = DataSet[DrmTuple[K]]
@@ -78,7 +81,7 @@ package object flinkbindings {
def readCsv(file: String, delim: String = ",", comment: String = "#")
- (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+ (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
val vectors = dc.env.readTextFile(file)
.filter(new FilterFunction[String] {
def filter(in: String): Boolean = {
@@ -94,8 +97,8 @@ package object flinkbindings {
datasetToDrm(vectors)
}
- def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Int] = {
- val zipped = new DataSetOps(ds).zipWithIndex
+ def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
+ val zipped = ds.zipWithIndex
datasetWrap(zipped)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
index dd76ff4..6fb71ea 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
@@ -18,8 +18,7 @@
*/
package org.apache.mahout.flinkbindings
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.mahout.flinkbindings._
+import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.mahout.math.drm.DistributedContext
import org.apache.mahout.test.DistributedMahoutSuite
import org.scalatest.Suite
http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
index a9e8436..4e713c7 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
@@ -18,7 +18,7 @@
*/
package org.apache.mahout.flinkbindings.examples
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
[2/2] mahout git commit: NOJira: Add missing license header
Posted by sm...@apache.org.
NOJira: Add missing license header
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/af015ece
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/af015ece
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/af015ece
Branch: refs/heads/flink-binding
Commit: af015ece73400d7a215bc109835800ec1b480f0e
Parents: e2ab67f
Author: smarthi <sm...@apache.org>
Authored: Thu Nov 12 01:10:54 2015 -0500
Committer: smarthi <sm...@apache.org>
Committed: Thu Nov 12 01:15:20 2015 -0500
----------------------------------------------------------------------
.../org/apache/mahout/math/drm/CacheHint.scala | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/af015ece/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
index ac763f9..3755f31 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.math.drm
object CacheHint extends Enumeration {