You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:36:55 UTC
[12/32] mahout git commit: MAHOUT-1734: Flink: DRM IO
MAHOUT-1734: Flink: DRM IO
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0e7b0b46
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0e7b0b46
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0e7b0b46
Branch: refs/heads/flink-binding
Commit: 0e7b0b4623ff802323027a5e6cd4568e60b2a793
Parents: 1806ca8
Author: Alexey Grigorev <al...@gmail.com>
Authored: Tue Jun 2 16:04:47 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:41:49 2015 +0200
----------------------------------------------------------------------
.../mahout/flinkbindings/DataSetOps.scala | 60 ++++++++
.../mahout/flinkbindings/FlinkEngine.scala | 147 ++++++++++---------
.../mahout/flinkbindings/blas/package.scala | 2 +-
.../drm/CheckpointedFlinkDrm.scala | 74 +++++++---
.../mahout/flinkbindings/io/DrmMetadata.scala | 53 +++++++
.../flinkbindings/io/HDFSPathSearch.scala | 82 +++++++++++
.../mahout/flinkbindings/io/HDFSUtil.scala | 33 +++++
.../flinkbindings/io/Hadoop1HDFSUtil.scala | 85 +++++++++++
.../apache/mahout/flinkbindings/package.scala | 50 ++++++-
.../flinkbindings/examples/ReadCsvExample.scala | 21 +++
pom.xml | 3 +-
11 files changed, 511 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
new file mode 100644
index 0000000..c7a92c2
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
@@ -0,0 +1,60 @@
+package org.apache.mahout.flinkbindings
+
+import java.lang.Iterable
+import java.util.Collections
+import java.util.Comparator
+import scala.collection.JavaConverters._
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.configuration.Configuration
+import scala.reflect.ClassTag
+
+
+class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
+
+ /**
+ * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
+ *
+ * TODO: remove when FLINK-2152 is committed and released
+ */
+ def zipWithIndex(): DataSet[(Long, K)] = {
+
+ // first for each partition count the number of elements - to calculate the offsets
+ val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Long)] {
+ override def mapPartition(values: Iterable[K], out: Collector[(Int, Long)]): Unit = {
+ val cnt: Long = values.asScala.count(_ => true)
+ val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
+ out.collect(subtaskIdx -> cnt)
+ }
+ })
+
+ // then use the offsets to index items of each partition
+ val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Long, K)] {
+ var offset: Long = 0
+
+ override def open(parameters: Configuration): Unit = {
+ val offsetsJava: java.util.List[(Int, Long)] =
+ getRuntimeContext.getBroadcastVariable("counts")
+ val offsets = offsetsJava.asScala
+
+ val sortedOffsets =
+ offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
+
+ val subtaskId = getRuntimeContext.getIndexOfThisSubtask
+ offset = sortedOffsets.take(subtaskId).sum
+ }
+
+ override def mapPartition(values: Iterable[K], out: Collector[(Long, K)]): Unit = {
+ val it = values.asScala
+ it.zipWithIndex.foreach { case (value, idx) =>
+ out.collect((idx + offset, value))
+ }
+ }
+ }).withBroadcastSet(counts, "counts");
+
+ zipped
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index f174871..1b0464e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -1,66 +1,79 @@
package org.apache.mahout.flinkbindings
+import java.util.Collection
import scala.reflect.ClassTag
-import org.apache.flink.api.scala.DataSet
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
+import scala.collection.JavaConverters._
+import com.google.common.collect._
import org.apache.mahout.math._
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.BCast
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DistributedEngine
-import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetElementReadSchema
-import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetReadSchema
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.math.indexeddataset.Schema
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.indexeddataset._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.scalabindings.RLikeOps._
-import com.google.common.collect.BiMap
-import com.google.common.collect.HashBiMap
-import scala.collection.JavaConverters._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.mahout.math.drm.DrmTuple
-import java.util.Collection
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.math.indexeddataset.BiDictionary
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
import org.apache.mahout.flinkbindings.blas._
-import org.apache.mahout.math.drm.logical.OpAx
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpABt
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.math.drm.logical.OpCbind
-import org.apache.mahout.math.drm.logical.OpRbind
-import org.apache.mahout.math.drm.logical.OpMapBlock
-import org.apache.mahout.math.drm.logical.OpRowRange
-import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.mahout.math.indexeddataset.BiDictionary
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.java.io.TypeSerializerInputFormat
+import org.apache.flink.api.common.io.SerializedInputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileInputFormat
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.mahout.flinkbindings.io._
+import org.apache.hadoop.io.Writable
+import org.apache.flink.api.java.tuple.Tuple2
object FlinkEngine extends DistributedEngine {
- /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+ // By default, use Hadoop 1 utils
+ var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+
+ /**
+ * Load DRM from hdfs (as in Mahout DRM format).
+ *
+ * @param path The DFS path to load from
+ * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+ */
+ override def drmDfsRead(path: String, parMin: Int = 0)
+ (implicit dc: DistributedContext): CheckpointedDrm[_] = {
+ val metadata = hdfsUtils.readDrmHeader(path)
+ val unwrapKey = metadata.unwrapKeyFunction
+
+ val job = new JobConf
+ val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
+ FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
+
+ val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
+
+ val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
+ def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
+ unwrapKey(tuple.f0) -> tuple.f1
+ }
+ })
+
+ datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+ }
+
+ override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
+ (implicit sc: DistributedContext): IndexedDataset = ???
+
+ override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
+ (implicit sc: DistributedContext): IndexedDataset = ???
+
+
+ /**
+ * Translates logical plan into Flink execution plan.
+ **/
override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
// Flink-specific Physical Plan translation.
val drm = flinkTranslate(plan)
-
- val newcp = new CheckpointedFlinkDrm(
- ds = drm.deblockify.ds,
- _nrow = plan.nrow,
- _ncol = plan.ncol
- )
-
+ val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
newcp.cache()
}
@@ -117,11 +130,10 @@ object FlinkEngine extends DistributedEngine {
case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
}
-
-
- def translate[K: ClassTag](oper: DrmLike[K]): DataSet[K] = ???
- /** Engine-specific colSums implementation based on a checkpoint. */
+ /**
+ * returns a vector that contains a column-wise sum from DRM
+ */
override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
def map(tuple: (K, Vector)): Vector = tuple._2
@@ -129,18 +141,23 @@ object FlinkEngine extends DistributedEngine {
def reduce(v1: Vector, v2: Vector) = v1 + v2
})
- val list = CheckpointedFlinkDrm.flinkCollect(sum, "FlinkEngine colSums()")
+ val list = sum.collect.asScala.toList
list.head
}
/** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ???
- /** Engine-specific colMeans implementation based on a checkpoint. */
+ /**
+ * returns a vector that contains a column-wise mean from DRM
+ */
override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
drm.colSums() / drm.nrow
}
+ /**
+ * Calculates the element-wise squared norm of a matrix
+ */
override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
def map(tuple: (K, Vector)): Double = tuple match {
@@ -150,7 +167,7 @@ object FlinkEngine extends DistributedEngine {
def reduce(v1: Double, v2: Double) = v1 + v2
})
- val list = CheckpointedFlinkDrm.flinkCollect(sumOfSquares, "FlinkEngine norm()")
+ val list = sumOfSquares.collect.asScala.toList
list.head
}
@@ -160,29 +177,21 @@ object FlinkEngine extends DistributedEngine {
/** Broadcast support */
override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = ???
- /**
- * Load DRM from hdfs (as in Mahout DRM format).
- * <P/>
- * @param path The DFS path to load from
- * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
- */
- override def drmDfsRead(path: String, parMin: Int = 0)
- (implicit sc: DistributedContext): CheckpointedDrm[_] = ???
/** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
- (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+ (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
val parallelDrm = parallelize(m, numPartitions)
new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
}
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
- (implicit sc: DistributedContext): DrmDataSet[Int] = {
+ (implicit dc: DistributedContext): DrmDataSet[Int] = {
val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
val rowsJava: Collection[DrmTuple[Int]] = rows.asJava
val dataSetType = TypeExtractor.getForObject(rows.head)
- sc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
+ dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
}
/** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
@@ -196,10 +205,4 @@ object FlinkEngine extends DistributedEngine {
/** Creates empty DRM with non-trivial height */
override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
(implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
-
- override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
- (implicit sc: DistributedContext): IndexedDataset = ???
-
- override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
- (implicit sc: DistributedContext): IndexedDataset = ???
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index af5ccc8..fb154e4 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -11,5 +11,5 @@ package object blas {
def tuple_1[K: ClassTag] = new KeySelector[(Int, K), Integer] {
def getKey(tuple: Tuple2[Int, K]): Integer = tuple._1
}
-
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e7d9dcd..0df75ca 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -20,13 +20,24 @@ import scala.collection.JavaConverters._
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.DataSet
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.LongWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.Vector
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
- private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
- private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
- // private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
- override protected[mahout] val partitioningTag: Long = Random.nextLong(),
- private var _canHaveMissingRows: Boolean = false) extends CheckpointedDrm[K] {
+ private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
+ private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+ override protected[mahout] val partitioningTag: Long = Random.nextLong(),
+ private var _canHaveMissingRows: Boolean = false
+ ) extends CheckpointedDrm[K] {
lazy val nrow: Long = if (_nrow >= 0) _nrow else computeNRow
lazy val ncol: Int = if (_ncol >= 0) _ncol else computeNCol
@@ -38,7 +49,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
def reduce(a1: Long, a2: Long) = a1 + a2
})
- val list = CheckpointedFlinkDrm.flinkCollect(count, "CheckpointedFlinkDrm computeNRow()")
+ val list = count.collect().asScala.toList
list.head
}
@@ -49,7 +60,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
def reduce(a1: Int, a2: Int) = Math.max(a1, a2)
})
- val list = CheckpointedFlinkDrm.flinkCollect(max, "CheckpointedFlinkDrm computeNCol()")
+ val list = max.collect().asScala.toList
list.head
}
@@ -69,7 +80,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
def collect: Matrix = {
- val data = CheckpointedFlinkDrm.flinkCollect(ds, "Checkpointed Flink Drm collect()")
+ val data = ds.collect().asScala.toList
val isDense = data.forall(_._2.isDense)
val m = if (isDense) {
@@ -98,7 +109,42 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
m
}
- def dfsWrite(path: String) = ???
+ def dfsWrite(path: String): Unit = {
+ val env = ds.getExecutionEnvironment
+
+ val keyTag = implicitly[ClassTag[K]]
+ val convertKey = keyToWritableFunc(keyTag)
+
+ val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] {
+ def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
+ case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
+ }
+ })
+
+ val job = new JobConf
+ val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
+ FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
+
+ val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job)
+ writableDataset.output(hadoopOutput)
+
+ env.execute(s"dfsWrite($path)")
+ }
+
+ private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = {
+ if (keyTag.runtimeClass == classOf[Int]) {
+ (x: K) => new IntWritable(x.asInstanceOf[Int])
+ } else if (keyTag.runtimeClass == classOf[String]) {
+ (x: K) => new Text(x.asInstanceOf[String])
+ } else if (keyTag.runtimeClass == classOf[Long]) {
+ (x: K) => new LongWritable(x.asInstanceOf[Long])
+ } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
+ (x: K) => x.asInstanceOf[Writable]
+ } else {
+ throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
+ }
+ }
+
def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
override val context: DistributedContext = ds.getExecutionEnvironment
@@ -108,14 +154,4 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
object CheckpointedFlinkDrm {
val UNKNOWN = -1
- // needed for backwards compatibility with flink 0.8.1
- def flinkCollect[K](dataset: DataSet[K], jobName: String = "flinkCollect()"): List[K] = {
- val dataJavaList = new ArrayList[K]
- val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
- dataset.output(outputFormat)
- val data = dataJavaList.asScala
- dataset.getExecutionEnvironment.execute(jobName)
- data.toList
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
new file mode 100644
index 0000000..6efe99b
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -0,0 +1,53 @@
+package org.apache.mahout.flinkbindings.io
+
+import scala.reflect.ClassTag
+import org.apache.hadoop.io._
+import java.util.Arrays
+
+/**
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+class DrmMetadata(
+
+ /** Writable key type as a sub-type of Writable */
+ val keyTypeWritable: Class[_],
+
+ /** Value writable type, as a sub-type of Writable */
+ val valueTypeWritable: Class[_]) {
+
+ import DrmMetadata._
+
+ /**
+ * @param keyClassTag: Actual drm key class tag once converted out of writable
+ * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
+ */
+ val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match {
+ case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
+ case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
+ case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
+ case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
+ case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
+ case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
+ case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+ case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
+ }
+
+}
+
+object DrmMetadata {
+
+ private[io] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
+
+ private[io] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
+
+ private[io] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get()
+
+ private[io] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get()
+
+ private[io] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
+
+ private[io] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get()
+
+ private[io] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
+ w.asInstanceOf[BytesWritable].getLength())
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
new file mode 100644
index 0000000..fc97234
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+
+ /**
+ * Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed.
+ */
+ def uris: String = {
+ if (!filePattern.isEmpty){ // have file pattern so
+ val pathURIs = pathURI.split(",")
+ var files = ""
+ for ( uri <- pathURIs ){
+ files = findFiles(uri, filePattern, files)
+ }
+ if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+ files
+ }else{
+ pathURI
+ }
+ }
+
+ /**
+ * Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match
+ */
+ private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+ val seed = fs.getFileStatus(new Path(dir))
+ var f: String = files
+
+ if (seed.isDir) {
+ val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+ for (fileStatus <- fileStatuses) {
+ if (fileStatus.getPath().getName().matches(filePattern)
+ && !fileStatus.isDir) {
+ // found a file
+ if (fileStatus.getLen() != 0) {
+ // file is not empty
+ f = f + fileStatus.getPath.toUri.toString + ","
+ }
+ } else if (fileStatus.isDir && recursive) {
+ f = findFiles(fileStatus.getPath.toString, filePattern, f)
+ }
+ }
+ } else { f = dir }// was a filename not dir
+ f
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
new file mode 100644
index 0000000..7629385
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+/**
+ * High level Hadoop version-specific hdfs manipulations we need in context of our operations.
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+trait HDFSUtil {
+
+ /**
+ * Read DRM header information off (H)DFS.
+ */
+ def readDrmHeader(path: String): DrmMetadata
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
new file mode 100644
index 0000000..120edb4
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.io.{ Writable, SequenceFile }
+import org.apache.hadoop.fs.{ FileSystem, Path }
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+object Hadoop1HDFSUtil extends HDFSUtil {
+
+ /**
+ * Read the header of a sequence file and determine the Key and Value type
+ * @param path
+ * @return
+ */
+ def readDrmHeader(path: String): DrmMetadata = {
+ val dfsPath = new Path(path)
+ val fs = dfsPath.getFileSystem(new Configuration())
+
+ val partFilePath: Path = fs.listStatus(dfsPath)
+
+ // Filter out anything starting with .
+ .filter { s =>
+ !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
+ }
+
+ // Take path
+ .map(_.getPath)
+
+ // Take only one, if any
+ .headOption
+
+ // Require there's at least one partition file found.
+ .getOrElse {
+ throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+ }
+
+ val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+ try {
+ new DrmMetadata(
+ keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+ valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
+ } finally {
+ reader.close()
+ }
+
+ }
+
+ /**
+ * Delete a path from the filesystem
+ * @param path
+ */
+ def delete(path: String) {
+ val dfsPath = new Path(path)
+ val fs = dfsPath.getFileSystem(new Configuration())
+
+ if (fs.exists(dfsPath)) {
+ fs.delete(dfsPath, true)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 0b26781..6f04551 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -1,17 +1,23 @@
package org.apache.mahout
+import scala.reflect.ClassTag
+import org.slf4j.LoggerFactory
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.DenseVector
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.MatrixWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.drm._
-import org.slf4j.LoggerFactory
-import scala.reflect.ClassTag
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.flink.api.common.functions.FilterFunction
package object flinkbindings {
@@ -46,4 +52,36 @@ package object flinkbindings {
new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
}
+ private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
+ private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
+ private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()
+ private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get()
+
+ def readCsv(file: String, delim: String = ",", comment: String = "#")
+ (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
+ val vectors = dc.env.readTextFile(file)
+ .filter(new FilterFunction[String] {
+ def filter(in: String): Boolean = {
+ !in.startsWith(comment)
+ }
+ })
+ .map(new MapFunction[String, Vector] {
+ def map(in: String): Vector = {
+ val array = in.split(delim).map(_.toDouble)
+ new DenseVector(array)
+ }
+ })
+ datasetToDrm(vectors)
+ }
+
+ def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
+ val zipped = new DataSetOps(ds).zipWithIndex
+ datasetWrap(zipped)
+ }
+
+ def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = {
+ new CheckpointedFlinkDrm[K](dataset)
+ }
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
new file mode 100644
index 0000000..27b17d0
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
@@ -0,0 +1,21 @@
+package org.apache.mahout.flinkbindings.examples
+
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.mahout.flinkbindings._
+
+object ReadCsvExample {
+
+ def main(args: Array[String]): Unit = {
+ val filePath = "file:///c:/tmp/data/slashdot0902/Slashdot0902.txt"
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ implicit val ctx = new FlinkDistributedContext(env)
+
+ val drm = readCsv(filePath, delim = "\t", comment = "#")
+ val C = drm.t %*% drm
+ println(C.collect)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 67d2eee..4e7cecd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,8 @@
<scala.compat.version>2.10</scala.compat.version>
<scala.version>2.10.4</scala.version>
<spark.version>1.3.1</spark.version>
- <flink.version>0.9.0-milestone-1</flink.version>
+ <!-- TODO: Remove snapshot dependency when Flink 0.9.1 is released -->
+ <flink.version>0.9-SNAPSHOT</flink.version>
<h2o.version>0.1.25</h2o.version>
</properties>
<issueManagement>