You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:36:55 UTC

[12/32] mahout git commit: MAHOUT-1734: Flink: DRM IO

MAHOUT-1734: Flink: DRM IO


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0e7b0b46
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0e7b0b46
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0e7b0b46

Branch: refs/heads/flink-binding
Commit: 0e7b0b4623ff802323027a5e6cd4568e60b2a793
Parents: 1806ca8
Author: Alexey Grigorev <al...@gmail.com>
Authored: Tue Jun 2 16:04:47 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:41:49 2015 +0200

----------------------------------------------------------------------
 .../mahout/flinkbindings/DataSetOps.scala       |  60 ++++++++
 .../mahout/flinkbindings/FlinkEngine.scala      | 147 ++++++++++---------
 .../mahout/flinkbindings/blas/package.scala     |   2 +-
 .../drm/CheckpointedFlinkDrm.scala              |  74 +++++++---
 .../mahout/flinkbindings/io/DrmMetadata.scala   |  53 +++++++
 .../flinkbindings/io/HDFSPathSearch.scala       |  82 +++++++++++
 .../mahout/flinkbindings/io/HDFSUtil.scala      |  33 +++++
 .../flinkbindings/io/Hadoop1HDFSUtil.scala      |  85 +++++++++++
 .../apache/mahout/flinkbindings/package.scala   |  50 ++++++-
 .../flinkbindings/examples/ReadCsvExample.scala |  21 +++
 pom.xml                                         |   3 +-
 11 files changed, 511 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
new file mode 100644
index 0000000..c7a92c2
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
@@ -0,0 +1,60 @@
+package org.apache.mahout.flinkbindings
+
+import java.lang.Iterable
+import java.util.Collections
+import java.util.Comparator
+import scala.collection.JavaConverters._
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.configuration.Configuration
+import scala.reflect.ClassTag
+
+
+class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
+
+  /**
+   * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
+   * 
+   * TODO: remove when FLINK-2152 is committed and released 
+   */
+  def zipWithIndex(): DataSet[(Long, K)] = {
+
+    // first for each partition count the number of elements - to calculate the offsets
+    val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Long)] {
+      override def mapPartition(values: Iterable[K], out: Collector[(Int, Long)]): Unit = {
+        val cnt: Long = values.asScala.count(_ => true)
+        val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
+        out.collect(subtaskIdx -> cnt)
+      }
+    })
+
+    // then use the offsets to index items of each partition
+    val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Long, K)] {
+        var offset: Long = 0
+
+        override def open(parameters: Configuration): Unit = {
+          val offsetsJava: java.util.List[(Int, Long)] = 
+                  getRuntimeContext.getBroadcastVariable("counts")
+          val offsets = offsetsJava.asScala
+
+          val sortedOffsets = 
+            offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
+
+          val subtaskId = getRuntimeContext.getIndexOfThisSubtask
+          offset = sortedOffsets.take(subtaskId).sum
+        }
+
+        override def mapPartition(values: Iterable[K], out: Collector[(Long, K)]): Unit = {
+          val it = values.asScala
+          it.zipWithIndex.foreach { case (value, idx) =>
+            out.collect((idx + offset, value))
+          }
+        }
+    }).withBroadcastSet(counts, "counts");
+
+    zipped
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index f174871..1b0464e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -1,66 +1,79 @@
 package org.apache.mahout.flinkbindings
 
+import java.util.Collection
 import scala.reflect.ClassTag
-import org.apache.flink.api.scala.DataSet
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
+import scala.collection.JavaConverters._
+import com.google.common.collect._
 import org.apache.mahout.math._
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.BCast
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DistributedEngine
-import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetElementReadSchema
-import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetReadSchema
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.math.indexeddataset.Schema
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.indexeddataset._
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import com.google.common.collect.BiMap
-import com.google.common.collect.HashBiMap
-import scala.collection.JavaConverters._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.mahout.math.drm.DrmTuple
-import java.util.Collection
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.math.indexeddataset.BiDictionary
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.flinkbindings.blas._
-import org.apache.mahout.math.drm.logical.OpAx
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpABt
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.math.drm.logical.OpCbind
-import org.apache.mahout.math.drm.logical.OpRbind
-import org.apache.mahout.math.drm.logical.OpMapBlock
-import org.apache.mahout.math.drm.logical.OpRowRange
-import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.mahout.math.indexeddataset.BiDictionary
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.java.io.TypeSerializerInputFormat
+import org.apache.flink.api.common.io.SerializedInputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileInputFormat
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.mahout.flinkbindings.io._
+import org.apache.hadoop.io.Writable
+import org.apache.flink.api.java.tuple.Tuple2
 
 object FlinkEngine extends DistributedEngine {
 
-  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  // By default, use Hadoop 1 utils
+  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format).
+   * 
+   * @param path The DFS path to load from
+   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+   */
+  override def drmDfsRead(path: String, parMin: Int = 0)
+                         (implicit dc: DistributedContext): CheckpointedDrm[_] = {
+    val metadata = hdfsUtils.readDrmHeader(path)
+    val unwrapKey = metadata.unwrapKeyFunction
+
+    val job = new JobConf
+    val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
+    FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
+
+    val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
+
+    val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
+      def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
+        unwrapKey(tuple.f0) -> tuple.f1
+      }
+    })
+
+    datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+  }
+
+  override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
+                                    (implicit sc: DistributedContext): IndexedDataset = ???
+
+  override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
+                                            (implicit sc: DistributedContext): IndexedDataset = ???
+
+
+  /** 
+   * Translates logical plan into Flink execution plan. 
+   **/
   override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
     // Flink-specific Physical Plan translation.
     val drm = flinkTranslate(plan)
-
-    val newcp = new CheckpointedFlinkDrm(
-      ds = drm.deblockify.ds,
-      _nrow = plan.nrow,
-      _ncol = plan.ncol
-    )
-
+    val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
     newcp.cache()
   }
 
@@ -117,11 +130,10 @@ object FlinkEngine extends DistributedEngine {
     case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
     case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
   }
-  
-
-  def translate[K: ClassTag](oper: DrmLike[K]): DataSet[K] = ???
 
-  /** Engine-specific colSums implementation based on a checkpoint. */
+  /** 
+   * returns a vector that contains a column-wise sum from DRM 
+   */
   override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
       def map(tuple: (K, Vector)): Vector = tuple._2
@@ -129,18 +141,23 @@ object FlinkEngine extends DistributedEngine {
       def reduce(v1: Vector, v2: Vector) = v1 + v2
     })
 
-    val list = CheckpointedFlinkDrm.flinkCollect(sum, "FlinkEngine colSums()")
+    val list = sum.collect.asScala.toList
     list.head
   }
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
   override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ???
 
-  /** Engine-specific colMeans implementation based on a checkpoint. */
+  /** 
+   * returns a vector that contains a column-wise mean from DRM 
+   */
   override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     drm.colSums() / drm.nrow
   }
 
+  /**
+   * Calculates the element-wise squared norm of a matrix
+   */
   override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
     val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
       def map(tuple: (K, Vector)): Double = tuple match {
@@ -150,7 +167,7 @@ object FlinkEngine extends DistributedEngine {
       def reduce(v1: Double, v2: Double) = v1 + v2
     })
 
-    val list = CheckpointedFlinkDrm.flinkCollect(sumOfSquares, "FlinkEngine norm()")
+    val list = sumOfSquares.collect.asScala.toList
     list.head
   }
 
@@ -160,29 +177,21 @@ object FlinkEngine extends DistributedEngine {
   /** Broadcast support */
   override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = ???
 
-  /**
-   * Load DRM from hdfs (as in Mahout DRM format).
-   * <P/>
-   * @param path The DFS path to load from
-   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
-   */
-  override def drmDfsRead(path: String, parMin: Int = 0)
-                         (implicit sc: DistributedContext): CheckpointedDrm[_] = ???
 
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-                                           (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+                                           (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
     val parallelDrm = parallelize(m, numPartitions)
     new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
   }
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
-      (implicit sc: DistributedContext): DrmDataSet[Int] = {
+      (implicit dc: DistributedContext): DrmDataSet[Int] = {
     val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
     val rowsJava: Collection[DrmTuple[Int]]  = rows.asJava
 
     val dataSetType = TypeExtractor.getForObject(rows.head)
-    sc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
+    dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
   }
 
   /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
@@ -196,10 +205,4 @@ object FlinkEngine extends DistributedEngine {
   /** Creates empty DRM with non-trivial height */
   override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
                                       (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
-
-  override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
-                                    (implicit sc: DistributedContext): IndexedDataset = ???
-
-  override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
-                                            (implicit sc: DistributedContext): IndexedDataset = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index af5ccc8..fb154e4 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -11,5 +11,5 @@ package object blas {
   def tuple_1[K: ClassTag] = new KeySelector[(Int, K), Integer] {
     def getKey(tuple: Tuple2[Int, K]): Integer = tuple._1
   }
-  
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e7d9dcd..0df75ca 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -20,13 +20,24 @@ import scala.collection.JavaConverters._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.DataSet
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.LongWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.Vector
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
 
 class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
-  private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
-  private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
-  // private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
-  override protected[mahout] val partitioningTag: Long = Random.nextLong(),
-  private var _canHaveMissingRows: Boolean = false) extends CheckpointedDrm[K] {
+      private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
+      private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+      override protected[mahout] val partitioningTag: Long = Random.nextLong(),
+      private var _canHaveMissingRows: Boolean = false
+  ) extends CheckpointedDrm[K] {
 
   lazy val nrow: Long = if (_nrow >= 0) _nrow else computeNRow
   lazy val ncol: Int = if (_ncol >= 0) _ncol else computeNCol
@@ -38,7 +49,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       def reduce(a1: Long, a2: Long) = a1 + a2
     })
 
-    val list = CheckpointedFlinkDrm.flinkCollect(count, "CheckpointedFlinkDrm computeNRow()")
+    val list = count.collect().asScala.toList
     list.head
   }
 
@@ -49,7 +60,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       def reduce(a1: Int, a2: Int) = Math.max(a1, a2)
     })
 
-    val list = CheckpointedFlinkDrm.flinkCollect(max, "CheckpointedFlinkDrm computeNCol()")
+    val list = max.collect().asScala.toList
     list.head
   }
 
@@ -69,7 +80,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
 
   def collect: Matrix = {
-    val data = CheckpointedFlinkDrm.flinkCollect(ds, "Checkpointed Flink Drm collect()")
+    val data = ds.collect().asScala.toList
     val isDense = data.forall(_._2.isDense)
 
     val m = if (isDense) {
@@ -98,7 +109,42 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     m
   }
 
-  def dfsWrite(path: String) = ???
+  def dfsWrite(path: String): Unit = {
+    val env = ds.getExecutionEnvironment
+
+    val keyTag = implicitly[ClassTag[K]]
+    val convertKey = keyToWritableFunc(keyTag)
+
+    val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] {
+      def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
+        case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
+      }
+    })
+
+    val job = new JobConf
+    val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
+    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
+
+    val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+    writableDataset.output(hadoopOutput)
+
+    env.execute(s"dfsWrite($path)")
+  }
+
+  private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = {
+    if (keyTag.runtimeClass == classOf[Int]) { 
+      (x: K) => new IntWritable(x.asInstanceOf[Int])
+    } else if (keyTag.runtimeClass == classOf[String]) {
+      (x: K) => new Text(x.asInstanceOf[String]) 
+    } else if (keyTag.runtimeClass == classOf[Long]) {
+      (x: K) => new LongWritable(x.asInstanceOf[Long]) 
+    } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { 
+      (x: K) => x.asInstanceOf[Writable] 
+    } else { 
+      throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
+    }
+  }
+
   def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
 
   override val context: DistributedContext = ds.getExecutionEnvironment
@@ -108,14 +154,4 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
 object CheckpointedFlinkDrm {
   val UNKNOWN = -1
 
-  // needed for backwards compatibility with flink 0.8.1
-  def flinkCollect[K](dataset: DataSet[K], jobName: String = "flinkCollect()"): List[K] = {
-    val dataJavaList = new ArrayList[K]
-    val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
-    dataset.output(outputFormat)
-    val data = dataJavaList.asScala
-    dataset.getExecutionEnvironment.execute(jobName)
-    data.toList
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
new file mode 100644
index 0000000..6efe99b
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -0,0 +1,53 @@
+package org.apache.mahout.flinkbindings.io
+
+import scala.reflect.ClassTag
+import org.apache.hadoop.io._
+import java.util.Arrays
+
+/**
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+class DrmMetadata(
+
+  /** Writable  key type as a sub-type of Writable */
+  val keyTypeWritable: Class[_],
+
+  /** Value writable type, as a sub-type of Writable */
+  val valueTypeWritable: Class[_]) {
+
+  import DrmMetadata._
+
+  /**
+   * @param keyClassTag: Actual drm key class tag once converted out of writable
+   * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
+   */
+  val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match {
+    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
+    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
+    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
+    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
+    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
+    case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
+    case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+    case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
+  }
+
+}
+
+object DrmMetadata {
+
+  private[io] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
+
+  private[io] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
+
+  private[io] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get()
+
+  private[io] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get()
+
+  private[io] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
+
+  private[io] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get()
+
+  private[io] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
+    w.asInstanceOf[BytesWritable].getLength())
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
new file mode 100644
index 0000000..fc97234
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ * 
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+  val conf = new Configuration()
+  val fs = FileSystem.get(conf)
+
+  /**
+   * Returns a string of comma delimited URIs matching the filePattern
+   * When pattern matching dirs are never returned, only traversed.
+   */
+  def uris: String = {
+    if (!filePattern.isEmpty){ // have file pattern so
+    val pathURIs = pathURI.split(",")
+      var files = ""
+      for ( uri <- pathURIs ){
+        files = findFiles(uri, filePattern, files)
+      }
+      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+      files
+    }else{
+      pathURI
+    }
+  }
+
+  /**
+   * Find matching files in the dir, recursively call self when another directory is found
+   * Only files are matched, directories are traversed but never return a match
+   */
+  private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+    val seed = fs.getFileStatus(new Path(dir))
+    var f: String = files
+
+    if (seed.isDir) {
+      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+      for (fileStatus <- fileStatuses) {
+        if (fileStatus.getPath().getName().matches(filePattern)
+          && !fileStatus.isDir) {
+          // found a file
+          if (fileStatus.getLen() != 0) {
+            // file is not empty
+            f = f + fileStatus.getPath.toUri.toString + ","
+          }
+        } else if (fileStatus.isDir && recursive) {
+          f = findFiles(fileStatus.getPath.toString, filePattern, f)
+        }
+      }
+    } else { f = dir }// was a filename not dir
+    f
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
new file mode 100644
index 0000000..7629385
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+/**
+ * High level Hadoop version-specific hdfs manipulations we need in context of our operations.
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+trait HDFSUtil {
+
+  /**
+   *  Read DRM header information off (H)DFS.
+   */
+  def readDrmHeader(path: String): DrmMetadata
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
new file mode 100644
index 0000000..120edb4
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.io.{ Writable, SequenceFile }
+import org.apache.hadoop.fs.{ FileSystem, Path }
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+object Hadoop1HDFSUtil extends HDFSUtil {
+
+  /**
+   * Read the header of a sequence file and determine the Key and Value type
+   * @param path
+   * @return
+   */
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    val partFilePath: Path = fs.listStatus(dfsPath)
+
+      // Filter out anything starting with .
+      .filter { s =>
+        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
+      }
+
+      // Take path
+      .map(_.getPath)
+
+      // Take only one, if any
+      .headOption
+
+      // Require there's at least one partition file found.
+      .getOrElse {
+        throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+      }
+
+    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
+    } finally {
+      reader.close()
+    }
+
+  }
+
+  /**
+   * Delete a path from the filesystem
+   * @param path
+   */
+  def delete(path: String) {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    if (fs.exists(dfsPath)) {
+      fs.delete(dfsPath, true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 0b26781..6f04551 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -1,17 +1,23 @@
 package org.apache.mahout
 
+import scala.reflect.ClassTag
+import org.slf4j.LoggerFactory
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.DenseVector
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.MatrixWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.drm._
-import org.slf4j.LoggerFactory
-import scala.reflect.ClassTag
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.flink.api.common.functions.FilterFunction
 
 package object flinkbindings {
 
@@ -46,4 +52,36 @@ package object flinkbindings {
     new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
   }
 
+  private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
+  private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
+  private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()
+  private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get()
+
+  def readCsv(file: String, delim: String = ",", comment: String = "#")
+             (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
+    val vectors = dc.env.readTextFile(file)
+        .filter(new FilterFunction[String] {
+          def filter(in: String): Boolean = {
+            !in.startsWith(comment)
+          }
+        })
+        .map(new MapFunction[String, Vector] {
+          def map(in: String): Vector = {
+            val array = in.split(delim).map(_.toDouble)
+            new DenseVector(array)
+          }
+        })
+    datasetToDrm(vectors)
+  }
+
+  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
+    val zipped = new DataSetOps(ds).zipWithIndex
+    datasetWrap(zipped)
+  }
+
+  def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = {
+    new CheckpointedFlinkDrm[K](dataset)
+  }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
new file mode 100644
index 0000000..27b17d0
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
@@ -0,0 +1,21 @@
+package org.apache.mahout.flinkbindings.examples
+
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.mahout.flinkbindings._
+
+object ReadCsvExample {
+
+  def main(args: Array[String]): Unit = {
+    val filePath = "file:///c:/tmp/data/slashdot0902/Slashdot0902.txt"
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    implicit val ctx = new FlinkDistributedContext(env)
+
+    val drm = readCsv(filePath, delim = "\t", comment = "#")
+    val C = drm.t %*% drm
+    println(C.collect)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 67d2eee..4e7cecd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,8 @@
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
     <spark.version>1.3.1</spark.version>
-    <flink.version>0.9.0-milestone-1</flink.version>
+    <!-- TODO: Remove snapshot dependency when Flink 0.9.1 is released -->
+    <flink.version>0.9-SNAPSHOT</flink.version>
     <h2o.version>0.1.25</h2o.version>
   </properties>
   <issueManagement>