You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:36:57 UTC

[14/32] mahout git commit: MAHOUT-1570: Flink: added headers, comments and aknowledgements

MAHOUT-1570: Flink: added headers, comments and aknowledgements


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/58f79489
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/58f79489
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/58f79489

Branch: refs/heads/flink-binding
Commit: 58f79489129c73b9a4bd7cd5a503cfaf723ccaef
Parents: 0e7b0b4
Author: Alexey Grigorev <al...@gmail.com>
Authored: Tue Jun 16 15:18:42 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:41:50 2015 +0200

----------------------------------------------------------------------
 .../mahout/flinkbindings/DataSetOps.scala       | 136 +++---
 .../flinkbindings/FlinkDistributedContext.scala |  29 +-
 .../mahout/flinkbindings/FlinkEngine.scala      | 440 ++++++++++---------
 .../mahout/flinkbindings/blas/FlinkOpAewB.scala |   4 +
 .../flinkbindings/blas/FlinkOpAewScalar.scala   |  22 +
 .../mahout/flinkbindings/blas/FlinkOpAt.scala   |  22 +-
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  | 180 ++++----
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   |  23 +
 .../flinkbindings/blas/FlinkOpCBind.scala       |  23 +
 .../flinkbindings/blas/FlinkOpMapBlock.scala    |  22 +
 .../flinkbindings/blas/FlinkOpRBind.scala       |  19 +
 .../flinkbindings/blas/FlinkOpRowRange.scala    |  22 +
 .../blas/FlinkOpTimesRightMatrix.scala          |  22 +
 .../mahout/flinkbindings/blas/package.scala     |  19 +-
 .../drm/CheckpointedFlinkDrm.scala              | 330 +++++++-------
 .../mahout/flinkbindings/drm/FlinkDrm.scala     |  18 +
 .../mahout/flinkbindings/io/DrmMetadata.scala   |  18 +
 .../flinkbindings/io/HDFSPathSearch.scala       |  39 +-
 .../mahout/flinkbindings/io/HDFSUtil.scala      |  29 +-
 .../flinkbindings/io/Hadoop1HDFSUtil.scala      |  29 +-
 .../apache/mahout/flinkbindings/package.scala   | 191 ++++----
 .../flinkbindings/DistributedFlinkSuit.scala    |  18 +
 .../mahout/flinkbindings/RLikeOpsSuite.scala    |  18 +
 .../mahout/flinkbindings/UseCasesSuite.scala    |  18 +
 .../mahout/flinkbindings/blas/LATestSuit.scala  |  18 +
 .../flinkbindings/examples/ReadCsvExample.scala |  60 ++-
 26 files changed, 1097 insertions(+), 672 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
index c7a92c2..840b4e6 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
@@ -1,60 +1,78 @@
-package org.apache.mahout.flinkbindings
-
-import java.lang.Iterable
-import java.util.Collections
-import java.util.Comparator
-import scala.collection.JavaConverters._
-import org.apache.flink.util.Collector
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.configuration.Configuration
-import scala.reflect.ClassTag
-
-
-class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
-
-  /**
-   * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
-   * 
-   * TODO: remove when FLINK-2152 is committed and released 
-   */
-  def zipWithIndex(): DataSet[(Long, K)] = {
-
-    // first for each partition count the number of elements - to calculate the offsets
-    val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Long)] {
-      override def mapPartition(values: Iterable[K], out: Collector[(Int, Long)]): Unit = {
-        val cnt: Long = values.asScala.count(_ => true)
-        val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
-        out.collect(subtaskIdx -> cnt)
-      }
-    })
-
-    // then use the offsets to index items of each partition
-    val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Long, K)] {
-        var offset: Long = 0
-
-        override def open(parameters: Configuration): Unit = {
-          val offsetsJava: java.util.List[(Int, Long)] = 
-                  getRuntimeContext.getBroadcastVariable("counts")
-          val offsets = offsetsJava.asScala
-
-          val sortedOffsets = 
-            offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
-
-          val subtaskId = getRuntimeContext.getIndexOfThisSubtask
-          offset = sortedOffsets.take(subtaskId).sum
-        }
-
-        override def mapPartition(values: Iterable[K], out: Collector[(Long, K)]): Unit = {
-          val it = values.asScala
-          it.zipWithIndex.foreach { case (value, idx) =>
-            out.collect((idx + offset, value))
-          }
-        }
-    }).withBroadcastSet(counts, "counts");
-
-    zipped
-  }
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import java.lang.Iterable
+import java.util.Collections
+import java.util.Comparator
+import scala.collection.JavaConverters._
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.configuration.Configuration
+import scala.reflect.ClassTag
+
+
+class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
+
+  /**
+   * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
+   * 
+   * TODO: remove when FLINK-2152 is committed and released 
+   */
+  def zipWithIndex(): DataSet[(Int, K)] = {
+
+    // first for each partition count the number of elements - to calculate the offsets
+    val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] {
+      override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit = {
+        val cnt: Int = values.asScala.count(_ => true)
+        val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
+        out.collect((subtaskIdx, cnt))
+      }
+    })
+
+    // then use the offsets to index items of each partition
+    val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] {
+        var offset: Int = 0
+
+        override def open(parameters: Configuration): Unit = {
+          val offsetsJava: java.util.List[(Int, Int)] = 
+                  getRuntimeContext.getBroadcastVariable("counts")
+          val offsets = offsetsJava.asScala
+
+          val sortedOffsets = 
+            offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
+
+          val subtaskId = getRuntimeContext.getIndexOfThisSubtask
+          offset = sortedOffsets.take(subtaskId).sum.toInt
+        }
+
+        override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit = {
+          val it = values.asScala
+          it.zipWithIndex.foreach { case (value, idx) =>
+            out.collect((idx + offset, value))
+          }
+        }
+    }).withBroadcastSet(counts, "counts");
+
+    zipped
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index e9130dd..ebe473f 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.mahout.flinkbindings
 
 import org.apache.flink.api.java.ExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 1b0464e..074676c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -1,208 +1,234 @@
-package org.apache.mahout.flinkbindings
-
-import java.util.Collection
-import scala.reflect.ClassTag
-import scala.collection.JavaConverters._
-import com.google.common.collect._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.indexeddataset._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.indexeddataset.BiDictionary
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.flinkbindings.blas._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.java.io.TypeSerializerInputFormat
-import org.apache.flink.api.common.io.SerializedInputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.mahout.flinkbindings.io._
-import org.apache.hadoop.io.Writable
-import org.apache.flink.api.java.tuple.Tuple2
-
-object FlinkEngine extends DistributedEngine {
-
-  // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
-
-  /**
-   * Load DRM from hdfs (as in Mahout DRM format).
-   * 
-   * @param path The DFS path to load from
-   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
-   */
-  override def drmDfsRead(path: String, parMin: Int = 0)
-                         (implicit dc: DistributedContext): CheckpointedDrm[_] = {
-    val metadata = hdfsUtils.readDrmHeader(path)
-    val unwrapKey = metadata.unwrapKeyFunction
-
-    val job = new JobConf
-    val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
-    FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
-
-    val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
-
-    val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
-      def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
-        unwrapKey(tuple.f0) -> tuple.f1
-      }
-    })
-
-    datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
-  }
-
-  override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
-                                    (implicit sc: DistributedContext): IndexedDataset = ???
-
-  override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
-                                            (implicit sc: DistributedContext): IndexedDataset = ???
-
-
-  /** 
-   * Translates logical plan into Flink execution plan. 
-   **/
-  override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
-    // Flink-specific Physical Plan translation.
-    val drm = flinkTranslate(plan)
-    val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
-    newcp.cache()
-  }
-
-  private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match {
-    case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAtx(a, x) => {
-      // express Atx as (A.t) %*% x
-      // TODO: create specific implementation of Atx
-      val opAt = OpAt(a)
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
-      val atCast = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-      val opAx = OpAx(atCast, x)
-      FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
-    }
-    case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), 
-        flinkTranslate(b)(op.classTagA))
-    case op @ OpABt(a, b) => {
-      // express ABt via AtB: let C=At and D=Bt, and calculate CtD
-      // TODO: create specific implementation of ABt
-      val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
-      val c = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-
-      val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
-      val d = new CheckpointedFlinkDrm(bt.deblockify.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
-
-      FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
-                .asInstanceOf[FlinkDrm[K]]
-    }
-    case op @ OpAtA(a) => {
-      // express AtA via AtB
-      // TODO: create specific implementation of AtA
-      val aInt = a.asInstanceOf[DrmLike[Int]] // TODO: casts!
-      val opAtB = OpAtB(aInt, aInt)
-      val aTranslated = flinkTranslate(aInt)
-      FlinkOpAtB.notZippable(opAtB, aTranslated, aTranslated)
-    }
-    case op @ OpTimesRightMatrix(a, b) => 
-      FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b)
-    case op @ OpAewScalar(a, scalar, _) => 
-      FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar)
-    case op @ OpAewB(a, b, _) =>
-      FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpCbind(a, b) => 
-      FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpRbind(a, b) => 
-      FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpRowRange(a, _) => 
-      FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA))
-    case op: OpMapBlock[K, _] => 
-      FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf)
-    case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
-    case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
-  }
-
-  /** 
-   * returns a vector that contains a column-wise sum from DRM 
-   */
-  override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
-      def map(tuple: (K, Vector)): Vector = tuple._2
-    }).reduce(new ReduceFunction[Vector] {
-      def reduce(v1: Vector, v2: Vector) = v1 + v2
-    })
-
-    val list = sum.collect.asScala.toList
-    list.head
-  }
-
-  /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
-  override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ???
-
-  /** 
-   * returns a vector that contains a column-wise mean from DRM 
-   */
-  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    drm.colSums() / drm.nrow
-  }
-
-  /**
-   * Calculates the element-wise squared norm of a matrix
-   */
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
-    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
-      def map(tuple: (K, Vector)): Double = tuple match {
-        case (idx, vec) => vec dot vec
-      }
-    }).reduce(new ReduceFunction[Double] {
-      def reduce(v1: Double, v2: Double) = v1 + v2
-    })
-
-    val list = sumOfSquares.collect.asScala.toList
-    list.head
-  }
-
-  /** Broadcast support */
-  override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = ???
-
-  /** Broadcast support */
-  override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = ???
-
-
-  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
-  override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-                                           (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
-    val parallelDrm = parallelize(m, numPartitions)
-    new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
-  }
-
-  private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
-      (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
-    val rowsJava: Collection[DrmTuple[Int]]  = rows.asJava
-
-    val dataSetType = TypeExtractor.getForObject(rows.head)
-    dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
-  }
-
-  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
-  override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-                                          (implicit sc: DistributedContext): CheckpointedDrm[String] = ???
-
-  /** This creates an empty DRM with specified number of partitions and cardinality. */
-  override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
-                                  (implicit sc: DistributedContext): CheckpointedDrm[Int] = ???
-
-  /** Creates empty DRM with non-trivial height */
-  override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
-                                      (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import java.util.Collection
+import scala.reflect.ClassTag
+import scala.collection.JavaConverters._
+import com.google.common.collect._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.indexeddataset._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.drm.DrmTuple
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.math.indexeddataset.BiDictionary
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
+import org.apache.mahout.flinkbindings.blas._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.java.io.TypeSerializerInputFormat
+import org.apache.flink.api.common.io.SerializedInputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileInputFormat
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.mahout.flinkbindings.io._
+import org.apache.hadoop.io.Writable
+import org.apache.flink.api.java.tuple.Tuple2
+
+object FlinkEngine extends DistributedEngine {
+
+  // By default, use Hadoop 1 utils
+  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format).
+   * 
+   * @param path The DFS path to load from
+   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+   */
+  override def drmDfsRead(path: String, parMin: Int = 0)
+                         (implicit dc: DistributedContext): CheckpointedDrm[_] = {
+    val metadata = hdfsUtils.readDrmHeader(path)
+    val unwrapKey = metadata.unwrapKeyFunction
+
+    val job = new JobConf
+    val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable]
+    FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path))
+
+    val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job)
+
+    val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] {
+      def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
+        (unwrapKey(tuple.f0), tuple.f1)
+      }
+    })
+
+    datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+  }
+
+  override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
+                                    (implicit sc: DistributedContext): IndexedDataset = ???
+
+  override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
+                                            (implicit sc: DistributedContext): IndexedDataset = ???
+
+
+  /** 
+   * Translates logical plan into Flink execution plan. 
+   **/
+  override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
+    // Flink-specific Physical Plan translation.
+    val drm = flinkTranslate(plan)
+
+    // to Help Flink's type inference had to use just one specific type - Int 
+    // see org.apache.mahout.flinkbindings.blas classes with TODO: casting inside
+    val cls = implicitly[ClassTag[K]]
+    if (!cls.runtimeClass.equals(classOf[Int])) {
+      throw new IllegalArgumentException(s"At the moment only Int indexes are supported. Got $cls")
+    }
+
+    val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+    newcp.cache()
+  }
+
+  private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match {
+    case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA))
+    case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA))
+    case op @ OpAtx(a, x) => {
+      // express Atx as (A.t) %*% x
+      // TODO: create specific implementation of Atx
+      val opAt = OpAt(a)
+      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
+      val atCast = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+      val opAx = OpAx(atCast, x)
+      FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
+    }
+    case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), 
+        flinkTranslate(b)(op.classTagA))
+    case op @ OpABt(a, b) => {
+      // express ABt via AtB: let C=At and D=Bt, and calculate CtD
+      // TODO: create specific implementation of ABt
+      val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+      val c = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
+
+      val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+      val d = new CheckpointedFlinkDrm(bt.deblockify.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
+
+      FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
+                .asInstanceOf[FlinkDrm[K]]
+    }
+    case op @ OpAtA(a) => {
+      // express AtA via AtB
+      // TODO: create specific implementation of AtA
+      val aInt = a.asInstanceOf[DrmLike[Int]] // TODO: casts!
+      val opAtB = OpAtB(aInt, aInt)
+      val aTranslated = flinkTranslate(aInt)
+      FlinkOpAtB.notZippable(opAtB, aTranslated, aTranslated)
+    }
+    case op @ OpTimesRightMatrix(a, b) => 
+      FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b)
+    case op @ OpAewScalar(a, scalar, _) => 
+      FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar)
+    case op @ OpAewB(a, b, _) =>
+      FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
+    case op @ OpCbind(a, b) => 
+      FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
+    case op @ OpRbind(a, b) => 
+      FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
+    case op @ OpRowRange(a, _) => 
+      FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA))
+    case op: OpMapBlock[K, _] => 
+      FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf)
+    case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
+    case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
+  }
+
+  /** 
+   * returns a vector that contains a column-wise sum from DRM 
+   */
+  override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+    val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
+      def map(tuple: (K, Vector)): Vector = tuple._2
+    }).reduce(new ReduceFunction[Vector] {
+      def reduce(v1: Vector, v2: Vector) = v1 + v2
+    })
+
+    val list = sum.collect.asScala.toList
+    list.head
+  }
+
+  /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
+  override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ???
+
+  /** 
+   * returns a vector that contains a column-wise mean from DRM 
+   */
+  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+    drm.colSums() / drm.nrow
+  }
+
+  /**
+   * Calculates the element-wise squared norm of a matrix
+   */
+  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
+    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+      def map(tuple: (K, Vector)): Double = tuple match {
+        case (idx, vec) => vec dot vec
+      }
+    }).reduce(new ReduceFunction[Double] {
+      def reduce(v1: Double, v2: Double) = v1 + v2
+    })
+
+    val list = sumOfSquares.collect.asScala.toList
+    list.head
+  }
+
+  /** Broadcast support */
+  override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = ???
+
+  /** Broadcast support */
+  override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = ???
+
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+                                           (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+    val parallelDrm = parallelize(m, numPartitions)
+    new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
+  }
+
+  private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
+      (implicit dc: DistributedContext): DrmDataSet[Int] = {
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+    val rowsJava: Collection[DrmTuple[Int]]  = rows.asJava
+
+    val dataSetType = TypeExtractor.getForObject(rows.head)
+    dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
+  }
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+                                          (implicit sc: DistributedContext): CheckpointedDrm[String] = ???
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+                                  (implicit sc: DistributedContext): CheckpointedDrm[Int] = ???
+
+  /** Creates empty DRM with non-trivial height */
+  override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+                                      (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index 3c4d51d..ed25d08 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -15,6 +15,10 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.Nil
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 
+/**
+ * Implementation is inspired by Spark-binding's OpAewB
+ * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) 
+ */
 object FlinkOpAewB {
 
   def rowWiseJoinNoSideEffect[K: ClassTag](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index 195613e..a1e1ab1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
@@ -9,6 +27,10 @@ import RLikeOps._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
 
+/**
+ * Implementation is inspired by Spark-binding's OpAewScalar
+ * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) 
+ */
 object FlinkOpAewScalar {
 
   def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index 08aea73..6ba3fd5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import org.apache.mahout.math.drm.logical.OpAt
@@ -20,9 +38,9 @@ import org.apache.flink.api.java.functions.KeySelector
 import java.util.ArrayList
 import org.apache.flink.shaded.com.google.common.collect.Lists
 
-
 /**
- * Taken from
+ * Implementation is taken from Spark's At
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
  */
 object FlinkOpAt {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index fa6ba24..b5eb17c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -1,80 +1,102 @@
-package org.apache.mahout.flinkbindings.blas
-
-import scala.reflect.ClassTag
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.Matrix
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.util.Collector
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import java.lang.Iterable
-import scala.collection.JavaConverters._
-import com.google.common.collect.Lists
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet
-import org.apache.flink.api.scala._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.mahout.flinkbindings.DrmDataSet
-
-
-object FlinkOpAtB {
-
-  def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = {
-    // TODO: to help Flink's type inference
-    // only Int is supported now 
-    val rowsAt = At.deblockify.ds.asInstanceOf[DrmDataSet[Int]]
-    val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Int]]
-    val joined = rowsAt.join(rowsB).where(tuple_1[Vector]).equalTo(tuple_1[Vector])
-
-    val ncol = op.ncol
-    val nrow = op.nrow
-    val blockHeight = 10
-    val blockCount = safeToNonNegInt((ncol - 1) / blockHeight + 1)
-
-    val preProduct = joined.flatMap(new FlatMapFunction[Tuple2[(Int, Vector), (Int, Vector)], 
-                                                        (Int, Matrix)] {
-      def flatMap(in: Tuple2[(Int, Vector), (Int, Vector)],
-                  out: Collector[(Int, Matrix)]): Unit = {
-        val avec = in.f0._2
-        val bvec = in.f1._2
-
-        0.until(blockCount) map { blockKey =>
-          val blockStart = blockKey * blockHeight
-          val blockEnd = Math.min(ncol, blockStart + blockHeight)
-
-          // Create block by cross product of proper slice of aRow and qRow
-          val outer = avec(blockStart until blockEnd) cross bvec
-          out.collect((blockKey, outer))
-        }
-      }
-    })
-
-    val res: BlockifiedDrmDataSet[Int] = preProduct.groupBy(tuple_1[Matrix]).reduceGroup(
-            new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
-      def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = {
-        val it = Lists.newArrayList(values).asScala
-        val (idx, _) = it.head
-
-        val block = it.map(t => t._2).reduce((m1, m2) => m1 + m2)
-
-        val keys = idx.until(block.nrow).toArray[Int]
-        out.collect((keys, block))
-      }
-    })
-
-    new BlockifiedFlinkDrm(res, ncol)
-  }
-
-}
-
-class DrmTupleToFlinkTupleMapper[K: ClassTag] extends MapFunction[(K, Vector), Tuple2[Int, Vector]] {
-  def map(tuple: (K, Vector)): Tuple2[Int, Vector] = tuple match {
-    case (key, vec) => new Tuple2[Int, Vector](key.asInstanceOf[Int], vec)
-  }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings.blas
+
+import scala.reflect.ClassTag
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.math.drm.logical.OpAtB
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.Matrix
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.util.Collector
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import java.lang.Iterable
+import scala.collection.JavaConverters._
+import com.google.common.collect.Lists
+import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
+import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.mahout.flinkbindings.DrmDataSet
+
+
+/**
+ * Implementation is taken from Spark's AtB
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+ */
+object FlinkOpAtB {
+
+  def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = {
+    // TODO: to help Flink's type inference
+    // only Int is supported now 
+    val rowsAt = At.deblockify.ds.asInstanceOf[DrmDataSet[Int]]
+    val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Int]]
+    val joined = rowsAt.join(rowsB).where(tuple_1[Vector]).equalTo(tuple_1[Vector])
+
+    val ncol = op.ncol
+    val nrow = op.nrow
+    val blockHeight = 10
+    val blockCount = safeToNonNegInt((ncol - 1) / blockHeight + 1)
+
+    val preProduct = joined.flatMap(new FlatMapFunction[Tuple2[(Int, Vector), (Int, Vector)], 
+                                                        (Int, Matrix)] {
+      def flatMap(in: Tuple2[(Int, Vector), (Int, Vector)],
+                  out: Collector[(Int, Matrix)]): Unit = {
+        val avec = in.f0._2
+        val bvec = in.f1._2
+
+        0.until(blockCount) map { blockKey =>
+          val blockStart = blockKey * blockHeight
+          val blockEnd = Math.min(ncol, blockStart + blockHeight)
+
+          // Create block by cross product of proper slice of aRow and qRow
+          val outer = avec(blockStart until blockEnd) cross bvec
+          out.collect((blockKey, outer))
+        }
+      }
+    })
+
+    val res: BlockifiedDrmDataSet[Int] = preProduct.groupBy(tuple_1[Matrix]).reduceGroup(
+            new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
+      def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = {
+        val it = Lists.newArrayList(values).asScala
+        val (idx, _) = it.head
+
+        val block = it.map(t => t._2).reduce((m1, m2) => m1 + m2)
+
+        val keys = idx.until(block.nrow).toArray[Int]
+        out.collect((keys, block))
+      }
+    })
+
+    new BlockifiedFlinkDrm(res, ncol)
+  }
+
+}
+
+class DrmTupleToFlinkTupleMapper[K: ClassTag] extends MapFunction[(K, Vector), Tuple2[Int, Vector]] {
+  def map(tuple: (K, Vector)): Tuple2[Int, Vector] = tuple match {
+    case (key, vec) => new Tuple2[Int, Vector](key.asInstanceOf[Int], vec)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index d401abf..72de022 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
@@ -15,6 +33,11 @@ import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.configuration.Configuration
 import java.util.List
 
+
+/**
+ * Implementation is taken from Spark's Ax
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+ */
 object FlinkOpAx {
 
   def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index ade9ba4..7f6e3fa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
@@ -16,6 +34,11 @@ import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 
+
+/**
+ * Implementation is taken from Spark's cbind
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+ */
 object FlinkOpCBind {
 
   def cbind[K: ClassTag](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index 4f12c0a..5d73f59 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
@@ -11,6 +29,10 @@ import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
 
+/**
+ * Implementation is taken from Spark's MapBlock
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+ */
 object FlinkOpMapBlock {
 
   def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 837b7a9..0a4d08a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
@@ -12,6 +30,7 @@ import org.apache.mahout.math.Vector
 object FlinkOpRBind {
 
   def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+    // note that indexes of B are already re-arranged prior to executing this code
     val res = A.deblockify.ds.union(B.deblockify.ds)
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index 50c83b6..edae80e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import org.apache.mahout.math.drm.logical.OpRowRange
@@ -7,6 +25,10 @@ import org.apache.mahout.math.Vector
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 
+/**
+ * Implementation is taken from Spark's OpRowRange
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+ */
 object FlinkOpRowRange {
 
   def slice(op: OpRowRange, A: FlinkDrm[Int]): FlinkDrm[Int] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index e26ee7d..dd96066 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
@@ -12,6 +30,10 @@ import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
 
+/**
+ * Implementation is taken from Spark's OpTimesRightMatrix:
+ * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+ */
 object FlinkOpTimesRightMatrix {
 
   def drmTimesInCore[K: ClassTag](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index fb154e4..6868a83 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -1,10 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings
 
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.mahout.math.Vector
 import scala.reflect.ClassTag
 
-
 package object blas {
 
   // TODO: remove it once figure out how to make Flink accept interfaces (Vector here)

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 0df75ca..1a42f84 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -1,157 +1,175 @@
-package org.apache.mahout.flinkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.mahout.math.drm.CacheHint
-import scala.util.Random
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.SparseMatrix
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat
-import java.util.ArrayList
-import scala.collection.JavaConverters._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.LongWritable
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.Vector
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
-
-class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
-      private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
-      private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
-      override protected[mahout] val partitioningTag: Long = Random.nextLong(),
-      private var _canHaveMissingRows: Boolean = false
-  ) extends CheckpointedDrm[K] {
-
-  lazy val nrow: Long = if (_nrow >= 0) _nrow else computeNRow
-  lazy val ncol: Int = if (_ncol >= 0) _ncol else computeNCol
-
-  protected def computeNRow: Long = { 
-    val count = ds.map(new MapFunction[DrmTuple[K], Long] {
-      def map(value: DrmTuple[K]): Long = 1L
-    }).reduce(new ReduceFunction[Long] {
-      def reduce(a1: Long, a2: Long) = a1 + a2
-    })
-
-    val list = count.collect().asScala.toList
-    list.head
-  }
-
-  protected def computeNCol: Int = {
-    val max = ds.map(new MapFunction[DrmTuple[K], Int] {
-      def map(value: DrmTuple[K]): Int = value._2.length
-    }).reduce(new ReduceFunction[Int] {
-      def reduce(a1: Int, a2: Int) = Math.max(a1, a2)
-    })
-
-    val list = max.collect().asScala.toList
-    list.head
-  }
-
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
-
-  def cache() = {
-    // TODO
-    this
-  }
-
-  def uncache = ???
-
-  // Members declared in org.apache.mahout.math.drm.DrmLike   
-
-  protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
-
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
-
-  def collect: Matrix = {
-    val data = ds.collect().asScala.toList
-    val isDense = data.forall(_._2.isDense)
-
-    val m = if (isDense) {
-      val cols = data.head._2.size()
-      val rows = data.length
-      new DenseMatrix(rows, cols)
-    } else {
-      val cols = ncol
-      val rows = safeToNonNegInt(nrow)
-      new SparseMatrix(rows, cols)
-    }
-
-    val intRowIndices = keyClassTag == implicitly[ClassTag[Int]]
-
-    if (intRowIndices)
-      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
-    else {
-      // assign all rows sequentially
-      val d = data.zipWithIndex
-      d.foreach(t => m(t._2, ::) := t._1._2)
-
-      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap.asJava
-      m.setRowLabelBindings(rowBindings)
-    }
-
-    m
-  }
-
-  def dfsWrite(path: String): Unit = {
-    val env = ds.getExecutionEnvironment
-
-    val keyTag = implicitly[ClassTag[K]]
-    val convertKey = keyToWritableFunc(keyTag)
-
-    val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] {
-      def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
-        case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
-      }
-    })
-
-    val job = new JobConf
-    val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
-    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
-
-    val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-    writableDataset.output(hadoopOutput)
-
-    env.execute(s"dfsWrite($path)")
-  }
-
-  private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = {
-    if (keyTag.runtimeClass == classOf[Int]) { 
-      (x: K) => new IntWritable(x.asInstanceOf[Int])
-    } else if (keyTag.runtimeClass == classOf[String]) {
-      (x: K) => new Text(x.asInstanceOf[String]) 
-    } else if (keyTag.runtimeClass == classOf[Long]) {
-      (x: K) => new LongWritable(x.asInstanceOf[Long]) 
-    } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { 
-      (x: K) => x.asInstanceOf[Writable] 
-    } else { 
-      throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
-    }
-  }
-
-  def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
-
-  override val context: DistributedContext = ds.getExecutionEnvironment
-
-}
-
-object CheckpointedFlinkDrm {
-  val UNKNOWN = -1
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.mahout.math.drm.CacheHint
+import scala.util.Random
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.DenseMatrix
+import org.apache.mahout.math.SparseMatrix
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat
+import java.util.ArrayList
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.DataSet
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.LongWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.Vector
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
+
+class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
+      private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
+      private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+      override protected[mahout] val partitioningTag: Long = Random.nextLong(),
+      private var _canHaveMissingRows: Boolean = false
+  ) extends CheckpointedDrm[K] {
+
+  lazy val nrow: Long = if (_nrow >= 0) _nrow else computeNRow
+  lazy val ncol: Int = if (_ncol >= 0) _ncol else computeNCol
+
+  protected def computeNRow: Long = { 
+    val count = ds.map(new MapFunction[DrmTuple[K], Long] {
+      def map(value: DrmTuple[K]): Long = 1L
+    }).reduce(new ReduceFunction[Long] {
+      def reduce(a1: Long, a2: Long) = a1 + a2
+    })
+
+    val list = count.collect().asScala.toList
+    list.head
+  }
+
+  protected def computeNCol: Int = {
+    val max = ds.map(new MapFunction[DrmTuple[K], Int] {
+      def map(value: DrmTuple[K]): Int = value._2.length
+    }).reduce(new ReduceFunction[Int] {
+      def reduce(a1: Int, a2: Int) = Math.max(a1, a2)
+    })
+
+    val list = max.collect().asScala.toList
+    list.head
+  }
+
+  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
+
+  def cache() = {
+    // TODO
+    this
+  }
+
+  def uncache = ???
+
+  // Members declared in org.apache.mahout.math.drm.DrmLike   
+
+  protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
+
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
+
+  def collect: Matrix = {
+    val data = ds.collect().asScala.toList
+    val isDense = data.forall(_._2.isDense)
+
+    val m = if (isDense) {
+      val cols = data.head._2.size()
+      val rows = data.length
+      new DenseMatrix(rows, cols)
+    } else {
+      val cols = ncol
+      val rows = safeToNonNegInt(nrow)
+      new SparseMatrix(rows, cols)
+    }
+
+    val intRowIndices = keyClassTag == implicitly[ClassTag[Int]]
+
+    if (intRowIndices)
+      data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
+    else {
+      // assign all rows sequentially
+      val d = data.zipWithIndex
+      d.foreach(t => m(t._2, ::) := t._1._2)
+
+      val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap.asJava
+      m.setRowLabelBindings(rowBindings)
+    }
+
+    m
+  }
+
+  def dfsWrite(path: String): Unit = {
+    val env = ds.getExecutionEnvironment
+
+    val keyTag = implicitly[ClassTag[K]]
+    val convertKey = keyToWritableFunc(keyTag)
+
+    val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] {
+      def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
+        case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
+      }
+    })
+
+    val job = new JobConf
+    val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
+    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
+
+    val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+    writableDataset.output(hadoopOutput)
+
+    env.execute(s"dfsWrite($path)")
+  }
+
+  private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = {
+    if (keyTag.runtimeClass == classOf[Int]) { 
+      (x: K) => new IntWritable(x.asInstanceOf[Int])
+    } else if (keyTag.runtimeClass == classOf[String]) {
+      (x: K) => new Text(x.asInstanceOf[String]) 
+    } else if (keyTag.runtimeClass == classOf[Long]) {
+      (x: K) => new LongWritable(x.asInstanceOf[Long]) 
+    } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { 
+      (x: K) => x.asInstanceOf[Writable] 
+    } else { 
+      throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
+    }
+  }
+
+  def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
+
+  override val context: DistributedContext = ds.getExecutionEnvironment
+
+}
+
+object CheckpointedFlinkDrm {
+  val UNKNOWN = -1
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 3dc5684..c959455 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.drm
 
 import org.apache.flink.api.common.functions.FlatMapFunction

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
index 6efe99b..24f298d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.mahout.flinkbindings.io
 
 import scala.reflect.ClassTag

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
index fc97234..e77143e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.mahout.flinkbindings.io
 
 import org.apache.hadoop.conf.Configuration
@@ -40,15 +41,15 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
    * When pattern matching dirs are never returned, only traversed.
    */
   def uris: String = {
-    if (!filePattern.isEmpty){ // have file pattern so
-    val pathURIs = pathURI.split(",")
+    if (!filePattern.isEmpty) { // have file pattern so
+      val pathURIs = pathURI.split(",")
       var files = ""
-      for ( uri <- pathURIs ){
+      for (uri <- pathURIs) {
         files = findFiles(uri, filePattern, files)
       }
       if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
       files
-    }else{
+    } else {
       pathURI
     }
   }
@@ -75,7 +76,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
           f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
       }
-    } else { f = dir }// was a filename not dir
+    } else { f = dir } // was a filename not dir
     f
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
index 7629385..d0d853d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.mahout.flinkbindings.io
 
 /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
index 120edb4..6581721 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.mahout.flinkbindings.io
 
 import org.apache.hadoop.io.{ Writable, SequenceFile }

http://git-wip-us.apache.org/repos/asf/mahout/blob/58f79489/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 6f04551..56c737a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -1,87 +1,106 @@
-package org.apache.mahout
-
-import scala.reflect.ClassTag
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.DenseVector
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.MatrixWritable
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.flink.api.common.functions.FilterFunction
-
-package object flinkbindings {
-
-  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings")
-
-  /** Row-wise organized DRM dataset type */
-  type DrmDataSet[K] = DataSet[DrmTuple[K]]
-
-  /**
-   * Blockifed DRM dataset (keys of original DRM are grouped into array corresponding to rows of Matrix
-   * object value
-   */
-  type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]
-
-  
-  implicit def wrapMahoutContext(context: DistributedContext): FlinkDistributedContext = {
-    assert(context.isInstanceOf[FlinkDistributedContext], "it must be FlinkDistributedContext")
-    context.asInstanceOf[FlinkDistributedContext]
-  }
-
-  implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext =
-    new FlinkDistributedContext(env)
-  implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env
-
-  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrm[K] = {
-    assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix")
-    drm.asInstanceOf[CheckpointedFlinkDrm[K]]
-  }
-
-  implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
-    val flinkDrm = castCheckpointedDrm(cp)
-    new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
-  }
-
-  private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
-  private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
-  private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()
-  private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get()
-
-  def readCsv(file: String, delim: String = ",", comment: String = "#")
-             (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
-    val vectors = dc.env.readTextFile(file)
-        .filter(new FilterFunction[String] {
-          def filter(in: String): Boolean = {
-            !in.startsWith(comment)
-          }
-        })
-        .map(new MapFunction[String, Vector] {
-          def map(in: String): Vector = {
-            val array = in.split(delim).map(_.toDouble)
-            new DenseVector(array)
-          }
-        })
-    datasetToDrm(vectors)
-  }
-
-  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
-    val zipped = new DataSetOps(ds).zipWithIndex
-    datasetWrap(zipped)
-  }
-
-  def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = {
-    new CheckpointedFlinkDrm[K](dataset)
-  }
-
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout
+
+import scala.reflect.ClassTag
+import org.slf4j.LoggerFactory
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.DenseVector
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.MatrixWritable
+import org.apache.mahout.math.VectorWritable
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
+import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
+import org.apache.flink.api.common.functions.FilterFunction
+
+package object flinkbindings {
+
+  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings")
+
+  /** Row-wise organized DRM dataset type */
+  type DrmDataSet[K] = DataSet[DrmTuple[K]]
+
+  /**
+   * Blockifed DRM dataset (keys of original DRM are grouped into array corresponding to rows of Matrix
+   * object value
+   */
+  type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]
+
+  
+  implicit def wrapMahoutContext(context: DistributedContext): FlinkDistributedContext = {
+    assert(context.isInstanceOf[FlinkDistributedContext], "it must be FlinkDistributedContext")
+    context.asInstanceOf[FlinkDistributedContext]
+  }
+
+  implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext =
+    new FlinkDistributedContext(env)
+  implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env
+
+  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrm[K] = {
+    assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix")
+    drm.asInstanceOf[CheckpointedFlinkDrm[K]]
+  }
+
+  implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
+    val flinkDrm = castCheckpointedDrm(cp)
+    new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
+  }
+
+  private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
+  private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
+  private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()
+  private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get()
+
+
+  def readCsv(file: String, delim: String = ",", comment: String = "#")
+             (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+    val vectors = dc.env.readTextFile(file)
+      .filter(new FilterFunction[String] {
+        def filter(in: String): Boolean = {
+          !in.startsWith(comment)
+        }
+      })
+      .map(new MapFunction[String, Vector] {
+        def map(in: String): Vector = {
+          val array = in.split(delim).map(_.toDouble)
+          new DenseVector(array)
+        }
+      })
+    datasetToDrm(vectors)
+  }
+
+  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Int] = {
+    val zipped = new DataSetOps(ds).zipWithIndex
+    datasetWrap(zipped)
+  }
+
+  def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = {
+    new CheckpointedFlinkDrm[K](dataset)
+  }
+
+
 }
\ No newline at end of file