You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/15 19:00:18 UTC

[2/2] mahout git commit: Major fixes for Flink backend merged

Major fixes for Flink backend merged


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/072289a4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/072289a4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/072289a4

Branch: refs/heads/flink-binding
Commit: 072289a46c9bd4b7297a17b621f7da30b94df1a7
Parents: 92a2f6c
Author: smarthi <sm...@apache.org>
Authored: Tue Mar 15 13:57:35 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Mar 15 13:57:35 2016 -0400

----------------------------------------------------------------------
 flink/pom.xml                                   |  20 +-
 .../mahout/flinkbindings/FlinkByteBCast.scala   |   3 +
 .../flinkbindings/FlinkDistributedContext.scala |   1 +
 .../mahout/flinkbindings/FlinkEngine.scala      | 325 ++++++++++++-------
 .../mahout/flinkbindings/blas/FlinkOpAewB.scala |  58 ++--
 .../flinkbindings/blas/FlinkOpAewScalar.scala   |  33 +-
 .../mahout/flinkbindings/blas/FlinkOpAt.scala   |  58 ++--
 .../mahout/flinkbindings/blas/FlinkOpAtA.scala  |  48 ++-
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  |  25 +-
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   |   4 +-
 .../flinkbindings/blas/FlinkOpCBind.scala       | 136 ++++----
 .../flinkbindings/blas/FlinkOpMapBlock.scala    |  34 +-
 .../flinkbindings/blas/FlinkOpRBind.scala       |   5 +-
 .../flinkbindings/blas/FlinkOpRowRange.scala    |  20 +-
 .../blas/FlinkOpTimesRightMatrix.scala          |  54 ++-
 .../mahout/flinkbindings/blas/package.scala     |  60 ----
 .../drm/CheckpointedFlinkDrm.scala              |  92 +++++-
 .../drm/CheckpointedFlinkDrmOps.scala           |   1 -
 .../mahout/flinkbindings/drm/FlinkDrm.scala     |  58 ++--
 .../mahout/flinkbindings/io/DrmMetadata.scala   |  16 +-
 .../flinkbindings/io/HDFSPathSearch.scala       |   6 +-
 .../flinkbindings/io/Hadoop1HDFSUtil.scala      |  86 -----
 .../flinkbindings/io/Hadoop2HDFSUtil.scala      |  94 ++++++
 .../apache/mahout/flinkbindings/package.scala   |  22 +-
 .../flinkbindings/DistributedFlinkSuite.scala   |   2 +
 .../mahout/flinkbindings/DrmLikeOpsSuite.scala  |   1 -
 .../flinkbindings/FailingTestsSuite.scala       | 272 ++++++++++++++++
 .../flinkbindings/FlinkByteBCastSuite.scala     |   9 +-
 .../mahout/flinkbindings/RLikeOpsSuite.scala    |   1 -
 .../mahout/flinkbindings/UseCasesSuite.scala    |   1 -
 .../mahout/flinkbindings/blas/LATestSuite.scala |   3 +-
 .../DistributedDecompositionsSuite.scala        |   1 -
 .../standard/DrmLikeOpsSuite.scala              |   1 -
 .../flinkbindings/standard/DrmLikeSuite.scala   |   1 -
 .../standard/NaiveBayesTestSuite.scala          |  11 +
 .../standard/RLikeDrmOpsSuite.scala             |   8 -
 math-scala/pom.xml                              |   2 +-
 .../math/drm/logical/OpTimesLeftMatrix.scala    |   2 +-
 .../org/apache/mahout/math/drm/package.scala    |   4 +-
 .../apache/mahout/math/scalabindings/MMul.scala |  45 ++-
 .../DistributedDecompositionsSuiteBase.scala    |   4 +-
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  |   3 +
 .../mahout/drivers/MahoutSparkDriver.scala      |   4 +-
 .../drivers/MahoutSparkOptionParser.scala       |   6 +-
 .../mahout/drivers/RowSimilarityDriver.scala    |   5 +-
 .../drivers/TextDelimitedReaderWriter.scala     |  10 +-
 .../apache/mahout/drivers/TrainNBDriver.scala   |  21 +-
 .../sparkbindings/SparkDistributedContext.scala |   2 +-
 .../apache/mahout/sparkbindings/blas/ABt.scala  |   2 +-
 .../test/DistributedSparkSuite.scala            |   4 +-
 50 files changed, 1023 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 37f1dbf..2ccb558 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -103,14 +103,15 @@
   </build>
 
   <dependencies>
+
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
+      <artifactId>flink-runtime_${scala.compat.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_2.10</artifactId>
+      <artifactId>flink-scala_${scala.compat.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
@@ -120,15 +121,28 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_2.10</artifactId>
+      <artifactId>flink-core</artifactId>
       <version>${flink.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.compat.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-math-scala_${scala.compat.version}</artifactId>
     </dependency>
 
+    <!-- enforce current version of kryo as of 0.10.1-->
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.24.0</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-hdfs</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 8544db0..5cdfb79 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -43,14 +43,17 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri
     if (streamType == FlinkByteBCast.StreamTypeVector) {
       val writeable = new VectorWritable()
       writeable.readFields(stream)
+    //  printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
       writeable.get.asInstanceOf[T]
     } else if (streamType == FlinkByteBCast.StreamTypeMatrix) {
       val writeable = new MatrixWritable()
       writeable.readFields(stream)
+     // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
       writeable.get.asInstanceOf[T]
     } else {
       throw new IllegalArgumentException(s"unexpected type tag $streamType")
     }
+
   }
 
   override def value: T = _value

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index c818030..cfc9209 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -26,6 +26,7 @@ class FlinkDistributedContext(val env: ExecutionEnvironment) extends Distributed
 
   val engine: DistributedEngine = FlinkEngine
 
+
   override def close() {
     // TODO
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index d03aef7..f848c3f 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -18,32 +18,29 @@
  */
 package org.apache.mahout.flinkbindings
 
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.hadoop.io.Writable
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils.DataSetUtils
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
 import org.apache.mahout.flinkbindings.blas._
 import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.flinkbindings.io.HDFSUtil
-import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil
+import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil}
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.indexeddataset.BiDictionary
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.math.indexeddataset.Schema
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema}
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 
-import org.apache.flink.api.scala._
-
+import scala.collection.JavaConversions._
+import scala.reflect._
 
 object FlinkEngine extends DistributedEngine {
 
-  // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  // By default, use Hadoop 2 utils
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   /**
    * Load DRM from hdfs (as in Mahout DRM format).
@@ -51,7 +48,7 @@ object FlinkEngine extends DistributedEngine {
    * @param path The DFS path to load from
    * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
    */
-  override def drmDfsRead(path: String, parMin: Int = 0)
+  override def drmDfsRead(path: String, parMin: Int = 1)
                          (implicit dc: DistributedContext): CheckpointedDrm[_] = {
 
     // Require that context is actually Flink context.
@@ -60,19 +57,47 @@ object FlinkEngine extends DistributedEngine {
     // Extract the Flink Environment variable
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
-    val metadata = hdfsUtils.readDrmHeader(path)
+    // set the parallelism of the env to parMin
+    env.setParallelism(parMin)
 
-    val unwrapKey  = metadata.unwrapKeyFunction
+    // get the header of a SequenceFile in the path
+    val metadata = hdfsUtils.readDrmHeader(path + "//")
 
-    val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
+    val keyClass: Class[_] = metadata.keyTypeWritable
 
-    val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] {
-      def map(tuple: (Writable, VectorWritable)): (Any, Vector) = {
-        (unwrapKey(tuple._1), tuple._2)
-      }
-    })
+    // from the header determine which function to use to unwrap the key
+    val unwrapKey = metadata.unwrapKeyFunction
+
+    // Map to the correct DrmLike based on the metadata information
+    if (metadata.keyClassTag == ClassTag.Int) {
+      val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] {
+        def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else if (metadata.keyClassTag == ClassTag.Long) {
+      val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] {
+        def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else if (metadata.keyClassTag == ClassTag(classOf[String])) {
+      val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] {
+        def map(tuple: (Text, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}")
 
-    datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
   }
 
   override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
@@ -82,89 +107,114 @@ object FlinkEngine extends DistributedEngine {
                                             (implicit sc: DistributedContext): IndexedDataset = ???
 
 
+  /**
+    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+    *
+    * A particular physical engine implementation may choose to either use or not use these rewrites
+    * as a useful basic rewriting rule.<P>
+    */
+  override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
   /** 
    * Translates logical plan into Flink execution plan. 
    **/
   override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
     // Flink-specific Physical Plan translation.
+
+    implicit val typeInformation = generateTypeInformation[K]
     val drm = flinkTranslate(plan)
     val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+   // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
     newcp.cache()
   }
 
-  private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match {
-    case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAtx(a, x) =>
-      // express Atx as (A.t) %*% x
-      // TODO: create specific implementation of Atx, see MAHOUT-1749
-      val opAt = OpAt(a)
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
-      val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-      val opAx = OpAx(atCast, x)
-      FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
-    case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), 
-        flinkTranslate(b)(op.classTagA))
-    case op @ OpABt(a, b) =>
-      // express ABt via AtB: let C=At and D=Bt, and calculate CtD
-      // TODO: create specific implementation of ABt, see MAHOUT-1750
-      val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
-      val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-
-      val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
-      val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
-
-      FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
-                .asInstanceOf[FlinkDrm[K]]
-    case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpTimesRightMatrix(a, b) => 
-      FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b)
-    case op @ OpAewUnaryFunc(a, _, _) =>
-      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAewUnaryFuncFusion(a, _) => 
-      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
-    // deprecated
-    case op @ OpAewScalar(a, scalar, _) => 
-      FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar)
-    case op @ OpAewB(a, b, _) =>
-      FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpCbind(a, b) => 
-      FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpRbind(a, b) => 
-      FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpCbindScalar(a, x, _) => 
-      FlinkOpCBind.cbindScalar(op, flinkTranslate(a)(op.classTagA), x)
-    case op @ OpRowRange(a, _) => 
-      FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpABAnyKey(a, b) if extractRealClassTag(a) != extractRealClassTag(b) =>
-      throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
-    case op: OpMapBlock[K, _] => 
-      FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf)
-    case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
-    case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
+  private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
+    implicit val kTag = oper.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+    oper match {
+      case OpAtAnyKey(_) ⇒
+        throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+      case op@OpAx(a, x) ⇒
+        //implicit val typeInformation = generateTypeInformation[K]
+        FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
+      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
+        // express Atx as (A.t) %*% x
+        // TODO: create specific implementation of Atx, see MAHOUT-1749
+        val opAt = OpAt(a)
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
+        val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+        val opAx = OpAx(atCast, x)
+        FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
+        flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
+      case op@OpABt(a, b) ⇒
+        // express ABt via AtB: let C=At and D=Bt, and calculate CtD
+        // TODO: create specific implementation of ABt, see MAHOUT-1750
+        val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+        val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+        val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+        val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+        val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
+        FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpTimesRightMatrix(a, b) ⇒
+        FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b)
+      case op@OpAewUnaryFunc(a, _, _) ⇒
+        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒
+        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+      // deprecated
+      case op@OpAewScalar(a, scalar, _) ⇒
+        FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar)
+      case op@OpAewB(a, b, _) ⇒
+        FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpCbind(a, b) ⇒
+        FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpRbind(a, b) ⇒
+        FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpCbindScalar(a, x, _) ⇒
+        FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x)
+      case op@OpRowRange(a, _) ⇒
+        FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag ⇒
+        throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
+      case op: OpMapBlock[K, _] ⇒
+        FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]]
+      case cp: CheckpointedFlinkDrm[K] ⇒
+        //implicit val ktag=cp.keyClassTag
+        new RowsFlinkDrm[K](cp.ds, cp.ncol)
+      case _ ⇒
+        throw new NotImplementedError(s"operator $oper is not implemented yet")
+    }
   }
 
   /** 
    * returns a vector that contains a column-wise sum from DRM 
    */
-  override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
-      def map(tuple: (K, Vector)): Vector = tuple._2
-    }).reduce(new ReduceFunction[Vector] {
-      def reduce(v1: Vector, v2: Vector) = v1 + v2
-    })
+  override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+
+    val sum = drm.ds.map {
+      tuple => tuple._2
+    }.reduce(_ + _)
 
     val list = sum.collect
     list.head
   }
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
-  override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
-      def map(tuple: (Array[K], Matrix)): Vector = {
-        val (_, block) = tuple
+  override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+
+    val result = drm.asBlockified.ds.map {
+      tuple =>
+        val block = tuple._2
         val acc = block(0, ::).like()
 
         block.foreach { v =>
@@ -172,10 +222,7 @@ object FlinkEngine extends DistributedEngine {
         }
 
         acc
-      }
-    }).reduce(new ReduceFunction[Vector] {
-      def reduce(v1: Vector, v2: Vector) = v1 + v2
-    })
+    }.reduce(_ + _)
 
     val list = result.collect
     list.head
@@ -184,21 +231,30 @@ object FlinkEngine extends DistributedEngine {
   /** 
    * returns a vector that contains a column-wise mean from DRM 
    */
-  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
     drm.colSums() / drm.nrow
   }
 
   /**
    * Calculates the element-wise squared norm of a matrix
    */
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
-    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
-      def map(tuple: (K, Vector)): Double = tuple match {
+  override def norm[K](drm: CheckpointedDrm[K]): Double = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sumOfSquares = drm.ds.map {
+      tuple => tuple match {
         case (idx, vec) => vec dot vec
       }
-    }).reduce(new ReduceFunction[Double] {
-      def reduce(v1: Double, v2: Double) = v1 + v2
-    })
+    }.reduce(_ + _)
+
+//    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+//      def map(tuple: (K, Vector)): Double = tuple match {
+//        case (idx, vec) => vec dot vec
+//      }
+//    }).reduce(new ReduceFunction[Double] {
+//      def reduce(v1: Double, v2: Double) = v1 + v2
+//    })
 
     val list = sumOfSquares.collect
     list.head
@@ -214,7 +270,7 @@ object FlinkEngine extends DistributedEngine {
     FlinkByteBCast.wrap(m)
 
 
-  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                            (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
 
@@ -223,14 +279,19 @@ object FlinkEngine extends DistributedEngine {
     new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
   }
 
+
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
       (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
     val dataSetType = TypeExtractor.getForObject(rows.head)
-    dc.env.fromCollection(rows).setParallelism(parallelismDegree)
+    //TODO: Make Sure that this is the correct partitioning scheme
+    dc.env.fromCollection(rows)
+            .partitionByRange(0)
+            .setParallelism(parallelismDegree)
+            .rebalance()
   }
 
-  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
   override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
                                           (implicit dc: DistributedContext): CheckpointedDrm[String] = {
     ???
@@ -247,7 +308,7 @@ object FlinkEngine extends DistributedEngine {
       for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
     }
     val result = dc.env.fromCollection(nonParallelResult)
-    new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
+    new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol)
   }
 
   /** Creates empty DRM with non-trivial height */
@@ -259,29 +320,53 @@ object FlinkEngine extends DistributedEngine {
    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
    * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
    */
-  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): 
+  def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false):
           (DrmLike[Int], Option[DrmLike[K]]) = ???
 
   /**
-   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+   * (Optional) Sampling operation.
    */
-  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ???
-
-  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
-
-//  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
-//
-//    val ncol = drmX match {
-//      case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
-//      case _ ⇒ -1
-//    }
-//
-//    val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
-//
-//  }
+  def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = {
+    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction)
+    new CheckpointedFlinkDrm[K](sample)
+  }
+
+  def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
+    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples)
+    new CheckpointedFlinkDrm[K](sample)
+  }
 
   /** Optional engine-specific all reduce tensor operation. */
-  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = 
+  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix =
     throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink")
- 
+
+//  private def generateTypeInformation[K]: TypeInformation[K] = {
+//    val tag = implicitly[K].asInstanceOf[ClassTag[K]]
+//    generateTypeInformationFromTag(tag)
+//  }
+  private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
+    val tag = implicitly[ClassTag[K]]
+
+    generateTypeInformationFromTag(tag)
+  }
+
+  private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
+    if (tag.runtimeClass.equals(classOf[Int])) {
+      createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
+    } else if (tag.runtimeClass.equals(classOf[Long])) {
+      createTypeInformation[Long].asInstanceOf[TypeInformation[K]]
+    } else if (tag.runtimeClass.equals(classOf[String])) {
+      createTypeInformation[String].asInstanceOf[TypeInformation[K]]
+//    } else if (tag.runtimeClass.equals(classOf[Any])) {
+//       createTypeInformation[Any].asInstanceOf[TypeInformation[K]]
+    } else {
+      throw new IllegalArgumentException(s"index type $tag is not supported")
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index f879e86..c61074b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -1,56 +1,42 @@
 package org.apache.mahout.flinkbindings.blas
 
-import java.lang.Iterable
 
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
 import org.apache.mahout.math.Vector
 import org.apache.mahout.math.drm.logical.OpAewB
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
-import com.google.common.collect.Lists
-
 /**
  * Implementation is inspired by Spark-binding's OpAewB
  * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) 
  */
 object FlinkOpAewB {
 
-  def rowWiseJoinNoSideEffect[K: ClassTag](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     val function = AewBOpsCloning.strToFunction(op.op)
 
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
-
-    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
-    val res: DataSet[(Any, Vector)] = 
-      rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
-        .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
-      def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], 
-                  out: Collector[(_, Vector)]): Unit = {
-        val it1 = Lists.newArrayList(it1java).asScala
-        val it2 = Lists.newArrayList(it2java).asScala
-
-        if (it1.nonEmpty && it2.nonEmpty) {
-          val (idx, a) = it1.head
-          val (_, b) = it2.head
-          out.collect((idx, function(a, b)))
-        } else if (it1.isEmpty && it2.nonEmpty) {
-          out.collect(it2.head)
-        } else if (it1.nonEmpty && it2.isEmpty) {
-          out.collect(it1.head)
-        }
+    val rowsA = A.asRowWise.ds
+    val rowsB = B.asRowWise.ds
+    implicit val kTag = op.keyClassTag
+
+    val res: DataSet[(K, Vector)] =
+      rowsA
+        .coGroup(rowsB)
+        .where(0)
+        .equalTo(0) {
+        (left, right, out: Collector[(K, Vector)]) =>
+          (left.toIterable.headOption, right.toIterable.headOption) match {
+            case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b)))
+            case (None, Some(b)) => out.collect(b)
+            case (Some(a), None) => out.collect(a)
+            case (None, None) => throw new RuntimeException("At least one side of the co group " +
+              "must be non-empty.")
+          }
       }
-    })
+
 
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index 67d710b..56e7deb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -19,6 +19,7 @@
 package org.apache.mahout.flinkbindings.blas
 
 import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.Matrix
 import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc}
@@ -39,43 +40,45 @@ object FlinkOpAewScalar {
   private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
 
   @Deprecated
-  def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
+  def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
     val function = EWOpsCloning.strToFunction(op.op)
+    implicit val kTag = op.keyClassTag
 
-    val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-      def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
-        case (keys, mat) => (keys, function(mat, scalar))
-      }
-    })
+
+    val res = A.asBlockified.ds.map{
+      tuple => (tuple._1, function(tuple._2, scalar))
+    }
 
     new BlockifiedFlinkDrm(res, op.ncol)
   }
 
-  def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
+  def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
     val f = op.f
     val inplace = isInplace
 
+
+    implicit val kTag = op.keyClassTag
+
     val res = if (op.evalZeros) {
-      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+      A.asBlockified.ds.map{
+        tuple =>
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned
           newBlock := ((_, _, x) => f(x))
           (keys, newBlock)
-        }
-      })
+      }
     } else {
-      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+      A.asBlockified.ds.map{
+        tuple =>
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned
           for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
           (keys, newBlock)
-        }
-      })
+      }
     }
 
     new BlockifiedFlinkDrm(res, op.ncol)
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index e515b34..6e320af 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -18,25 +18,13 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import java.lang.Iterable
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConverters.asScalaBufferConverter
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.shaded.com.google.common.collect.Lists
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SequentialAccessSparseVector
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.DrmTuple
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{SequentialAccessSparseVector, Vector}
 import org.apache.mahout.math.drm.logical.OpAt
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
-import org.apache.flink.api.scala._
+import scala.Array.canBuildFrom
 
 /**
  * Implementation is taken from Spark's At
@@ -52,34 +40,34 @@ object FlinkOpAt {
   def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
     val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
 
-    val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
-      def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
-        case (keys, block) =>
-          (0 until block.ncol).map(columnIdx => {
+    val sparseParts = A.asBlockified.ds.flatMap {
+      blockifiedTuple =>
+        val keys = blockifiedTuple._1
+        val block = blockifiedTuple._2
+
+        (0 until block.ncol).map {
+          columnIndex =>
             val columnVector: Vector = new SequentialAccessSparseVector(ncol)
 
-            keys.zipWithIndex.foreach { case (key, idx) =>
-              columnVector(key) = block(idx, columnIdx)
+            keys.zipWithIndex.foreach {
+              case (key, idx) => columnVector(key) = block(idx, columnIndex)
             }
 
-            out.collect((columnIdx, columnVector))
-          })
-      }
-    })
+            (columnIndex, columnVector)
+        }
+    }
 
-    val regrouped = sparseParts.groupBy(selector[Vector, Int])
+    val regrouped = sparseParts.groupBy(0)
 
-    val sparseTotal = regrouped.reduceGroup(new GroupReduceFunction[(Int, Vector), DrmTuple[Int]] {
-      def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = {
-        val it = Lists.newArrayList(values).asScala
-        val (idx, _) = it.head
-        val vector = (it map { case (idx, vec) => vec }).sum
-        out.collect((idx, vector))
-      }
-    })
+    val sparseTotal = regrouped.reduce{
+      (left, right) =>
+        (left._1, left._2 + right._2)
+    }
 
     // TODO: densify or not?
     new RowsFlinkDrm(sparseTotal, ncol)
   }
 
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 629857a..bdb0e5e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
 
-import scala.collection.JavaConverters._
-
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.shaded.com.google.common.collect.Lists
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
 import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 
+import scala.collection.JavaConverters._
 
 /**
  * Inspired by Spark's implementation from 
@@ -29,61 +27,59 @@ object FlinkOpAtA {
   final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
   final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200"
 
-  def at_a(op: OpAtA[_], A: FlinkDrm[_]): FlinkDrm[Int] = {
+  def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
     val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, PROPERTY_ATA_MAXINMEMNCOL_DEFAULT)
     val maxInMemNCol = maxInMemStr.toInt
     maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
 
+    implicit val kTag = A.classTag
+
     if (op.ncol <= maxInMemNCol) {
       implicit val ctx = A.context
       val inCoreAtA = slim(op, A)
       val result = drmParallelize(inCoreAtA, numPartitions = 1)
       result
     } else {
-      fat(op.asInstanceOf[OpAtA[Any]], A.asInstanceOf[FlinkDrm[Any]])
+      fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]])
     }
   }
 
-  def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = {
-    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
+  def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
+    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]]
 
-    val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] {
+    val res = ds.map {
       // TODO: optimize it: use upper-triangle matrices like in Spark
-      def map(block: (Array[Any], Matrix)): Matrix =  block match {
-        case (idx, m) => m.t %*% m
-      }
-    }).reduce(new ReduceFunction[Matrix] {
-      def reduce(m1: Matrix, m2: Matrix) = m1 + m2
-    }).collect()
+      block => block._2.t %*% block._2
+    }.reduce(_ + _).collect()
 
     res.head
   }
 
-  def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
+  def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
     val nrow = op.A.nrow
     val ncol = op.A.ncol
     val ds = A.asBlockified.ds
 
-    val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] {
-      def map(a: (Array[Any], Matrix)): Int = 1
+    val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], Matrix), Int] {
+      def map(a: (Array[K], Matrix)): Int = 1
     }).reduce(new ReduceFunction[Int] {
       def reduce(a: Int, b: Int): Int = a + b
     })
 
-    val subresults: DataSet[(Int, Matrix)] = 
-          ds.flatMap(new RichFlatMapFunction[(Array[Any], Matrix), (Int, Matrix)] {
+    val subresults: DataSet[(Int, Matrix)] =
+          ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, Matrix)] {
 
       var ranges: Array[Range] = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
         val parts = dsX.get(0)
         val numParts = estimatePartitions(nrow, ncol, parts)
         ranges = computeEvenSplits(ncol, numParts)
       }
 
-      def flatMap(tuple: (Array[Any], Matrix), out: Collector[(Int, Matrix)]): Unit = {
+      def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): Unit = {
         val block = tuple._2
 
         ranges.zipWithIndex.foreach { case (range, idx) => 
@@ -93,13 +89,13 @@ object FlinkOpAtA {
 
     }).withBroadcastSet(numberOfPartitions, "numberOfPartitions")
 
-    val res = subresults.groupBy(selector[Matrix, Int])
+    val res = subresults.groupBy(0)
                         .reduceGroup(new RichGroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
 
       var ranges: Array[Range] = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
         val parts = dsX.get(0)
         val numParts = estimatePartitions(nrow, ncol, parts)

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index b514868..6a081ba 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -46,25 +46,22 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpAtB {
 
-  def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = {
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
+  def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = {
 
-    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner)
+    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+    val joined = rowsAt.join(rowsB).where(0).equalTo(0)
 
     val ncol = op.ncol
     val nrow = op.nrow.toInt
     val blockHeight = 10
     val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1)
 
-    val preProduct: DataSet[(Int, Matrix)] = 
-             joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] {
-      def flatMap(in: Tuple2[(_, Vector), (_, Vector)],
-                  out: Collector[(Int, Matrix)]): Unit = {
+    val preProduct: DataSet[(Int, Matrix)] =
+             joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), (Int, Matrix)] {
+      def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, Matrix)]): Unit = {
         val avec = in._1._2
-        val bvec = in._1._2
+        val bvec = in._2._2
 
         0.until(blockCount) map { blockKey =>
           val blockStart = blockKey * blockHeight
@@ -72,13 +69,13 @@ object FlinkOpAtB {
 
           val outer = avec(blockStart until blockEnd) cross bvec
           out.collect(blockKey -> outer)
+          out
         }
       }
     })
 
     val res: BlockifiedDrmDataSet[Int] = 
-      preProduct.groupBy(selector[Matrix, Int])
-                .reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
+      preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
       def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = {
         val it = Lists.newArrayList(values).asScala
         val (idx, _) = it.head
@@ -90,7 +87,7 @@ object FlinkOpAtB {
       }
     })
 
-    new BlockifiedFlinkDrm(res, ncol)
+    new BlockifiedFlinkDrm[Int](res, ncol)
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 4302457..79f5fe8 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -21,6 +21,7 @@ package org.apache.mahout.flinkbindings.blas
 import java.util.List
 
 import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
 import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.{Matrix, Vector}
@@ -38,8 +39,9 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpAx {
 
-  def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
+  def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
     implicit val ctx = A.context
+    implicit val kTag = op.keyClassTag
 
     val singletonDataSetX = ctx.env.fromElements(op.x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 6cf5e5c..65b2a25 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -19,12 +19,14 @@
 package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
@@ -43,94 +45,86 @@ import org.apache.mahout.math.scalabindings._
  */
 object FlinkOpCBind {
 
-  def cbind[K: ClassTag](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = op.B.ncol
 
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
-
-    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
-    val res: DataSet[(Any, Vector)] = 
-      rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
-        .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
-      def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], 
-                  out: Collector[(_, Vector)]): Unit = {
-        val it1 = Lists.newArrayList(it1java).asScala
-        val it2 = Lists.newArrayList(it2java).asScala
-
-        if (it1.nonEmpty && it2.nonEmpty) {
-          val (idx, a) = it1.head
-          val (_, b) = it2.head
-
-          val result: Vector = if (a.isDense && b.isDense) {
-            new DenseVector(n) 
-          } else {
-            new SequentialAccessSparseVector(n)
-          }
-
-          result(0 until n1) := a
-          result(n1 until n) := b
-
-          out.collect((idx, result))
-        } else if (it1.isEmpty && it2.nonEmpty) {
-          val (idx, b) = it2.head
-          val result: Vector = if (b.isDense) { 
-            new DenseVector(n)
-          } else {
-            new SequentialAccessSparseVector(n)
-          }
-          result(n1 until n) := b
-          out.collect((idx, result))
-        } else if (it1.nonEmpty && it2.isEmpty) {
-          val (idx, a) = it1.head
-          val result: Vector = if (a.isDense) {
-            new DenseVector(n)
-          } else {
-            new SequentialAccessSparseVector(n)
+    implicit val classTag = op.A.keyClassTag
+
+    val rowsA = A.asRowWise.ds
+    val rowsB = B.asRowWise.ds
+
+    val res: DataSet[(K, Vector)] =
+      rowsA.coGroup(rowsB).where(0).equalTo(0) {
+        (left, right) =>
+          (left.toIterable.headOption, right.toIterable.headOption) match {
+            case (Some((idx, a)), Some((_, b))) =>
+              val result = if (a.isDense && b.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+
+              result(0 until n1) := a
+              result(n1 until n) := b
+
+              (idx, result)
+            case (Some((idx, a)), None) =>
+              val result: Vector = if (a.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+              result(n1 until n) := a
+
+              (idx, result)
+            case (None, Some((idx, b))) =>
+              val result: Vector = if (b.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+              result(n1 until n) := b
+
+              (idx, result)
+            case (None, None) =>
+              throw new RuntimeException("CoGroup should have at least one non-empty input.")
           }
-          result(n1 until n) := a
-          out.collect((idx, result))
-        }
       }
-    })
 
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
   }
 
-  def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
+  def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
     val left = op.leftBind
     val ds = A.asBlockified.ds
 
-    val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-      def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
-        case (keys, mat) => (keys, cbind(mat, x, left))
-      }
+    implicit val kTag= op.keyClassTag
 
-      def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
-        val ncol = mat.ncol
-        val newMat = mat.like(mat.nrow, ncol + 1)
+    def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
+      val ncol = mat.ncol
+      val newMat = mat.like(mat.nrow, ncol + 1)
 
-        if (left) {
-          newMat.zip(mat).foreach { case (newVec, origVec) =>
-            newVec(0) = x
-            newVec(1 to ncol) := origVec
-          }
-        } else {
-          newMat.zip(mat).foreach { case (newVec, origVec) =>
-            newVec(ncol) = x
-            newVec(0 to (ncol - 1)) := origVec
-          }
+      if (left) {
+        newMat.zip(mat).foreach { case (newVec, origVec) =>
+          newVec(0) = x
+          newVec(1 to ncol) := origVec
+        }
+      } else {
+        newMat.zip(mat).foreach { case (newVec, origVec) =>
+          newVec(ncol) = x
+          newVec(0 to (ncol - 1)) := origVec
         }
-
-        newMat
       }
-    })
+
+      newMat
+    }
+
+    val out = A.asBlockified.ds.map {
+      tuple => (tuple._1, cbind(tuple._2, x, left))
+    }
 
     new BlockifiedFlinkDrm(out, op.ncol)
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index 9530d43..c3918a5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -18,13 +18,10 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.drm.logical.OpMapBlock
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
 /**
@@ -33,16 +30,19 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
  */
 object FlinkOpMapBlock {
 
-  def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {
-    val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
-      def map(block: (Array[S], Matrix)): (Array[R], Matrix) =  {
-        val out = function(block)
-        assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
-        assert(out._2.ncol == ncol, s"block map must return $ncol number of columns.")
-        out
-      }
-    })
+  def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = {
+    implicit val rtag = operator.keyClassTag
+    val bmf = operator.bmf
+    val ncol = operator.ncol
+    val res = src.asBlockified.ds.map {
+      block =>
+        val result = bmf(block)
+        assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
+        assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.")
+       // printf("Block partition: \n%s\n", block._2)
+        result
+    }
 
-    new BlockifiedFlinkDrm(res, ncol)
+    new BlockifiedFlinkDrm[R](res, ncol)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 83beaa1..4fa2eaa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -18,6 +18,8 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
 import scala.reflect.ClassTag
 
 import org.apache.flink.api.scala.DataSet
@@ -28,8 +30,9 @@ import org.apache.mahout.math.drm.logical.OpRbind
 
 object FlinkOpRBind {
 
-  def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     // note that indexes of B are already re-arranged prior to executing this code
+    implicit val kTag = op.keyClassTag
     val res = A.asRowWise.ds.union(B.asRowWise.ds)
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index 6e11892..39f4ceb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -18,11 +18,9 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Vector
 import org.apache.mahout.math.drm.logical.OpRowRange
 
 /**
@@ -35,17 +33,13 @@ object FlinkOpRowRange {
     val rowRange = op.rowRange
     val firstIdx = rowRange.head
 
-    val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] {
-      def filter(tuple: (Int, Vector)): Boolean = tuple match {
-        case (idx, vec) => rowRange.contains(idx)
-      }
-    })
+    val filtered = A.asRowWise.ds.filter {
+      tuple => rowRange.contains(tuple._1)
+    }
 
-    val res = filtered.map(new MapFunction[(Int, Vector), (Int, Vector)] {
-      def map(tuple: (Int, Vector)): (Int, Vector) = tuple match {
-        case (idx, vec) => (idx - firstIdx, vec)
-      }
-    })
+    val res = filtered.map {
+      tuple => (tuple._1 - firstIdx, tuple._2)
+    }
 
     new RowsFlinkDrm(res, op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index 989fad1..4e5b1a7 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -18,17 +18,16 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import org.apache.flink.api.scala._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{DenseMatrix, Matrix}
 
 /**
  * Implementation is taken from Spark's OpTimesRightMatrix:
@@ -36,20 +35,51 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpTimesRightMatrix {
 
-  def drmTimesInCore[K: ClassTag](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
+  def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
     implicit val ctx = A.context
+    implicit val kTag = op.keyClassTag
+
+
 
-    val singletonDataSetB = ctx.env.fromElements(inCoreB)
+    /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo
+     * Issue resulkting in a stackOverflow error.
+     * 
+     * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end
+     * 
+     * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself.
+     */
+    
+    //  val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+    
+    //  val inCoreBcastB = FlinkEngine.drmBroadcast(inCoreB)
+    //  val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+    val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
+    val dataSetType = TypeExtractor.getForObject(rows.head)
+    val singletonDataSetB = ctx.env.fromCollection(rows)
 
     val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       var inCoreB: Matrix = null
 
       override def open(params: Configuration): Unit = {
         val runtime = this.getRuntimeContext
-        val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix")
-        inCoreB = dsB.get(0)
-      }
+        //val dsB: java.util.List[Matrix]
+        val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = runtime.getBroadcastVariable("matrix")
+        val m = dsB.size()
+        val n = dsB.get(0)._2.size
+        val isDense = dsB.get(0)._2.isDense
 
+        inCoreB = isDense match {
+          case true => new DenseMatrix(m, n)
+          case false => new DenseMatrix(m, n)
+        }
+        for (i <- 0 until m) {
+          inCoreB(i, ::) := dsB.get(i)._2
+        }
+
+      }
+     
       override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
         case (keys, block_A) => (keys, block_A %*% inCoreB)
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
deleted file mode 100644
index 27f552c..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import scala.reflect.ClassTag
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-package object blas {
-
-  // TODO: remove it once figure out how to make Flink accept interfaces (Vector here)
-  def selector[V, K: ClassTag]: KeySelector[(K, V), K] = {
-    val tag = implicitly[ClassTag[K]]
-    if (tag.runtimeClass.equals(classOf[Int])) {
-      tuple_1_int.asInstanceOf[KeySelector[(K, V), K]]
-    } else if (tag.runtimeClass.equals(classOf[Long])) {
-      tuple_1_long.asInstanceOf[KeySelector[(K, V), K]]
-    } else if (tag.runtimeClass.equals(classOf[String])) {
-      tuple_1_string.asInstanceOf[KeySelector[(K, V), K]]
-    } else {
-      throw new IllegalArgumentException(s"index type $tag is not supported")
-    }
-  }
-
-  private def tuple_1_int[K: ClassTag] = new KeySelector[(Int, _), Int] 
-                  with ResultTypeQueryable[Int] {
-    def getKey(tuple: Tuple2[Int, _]): Int = tuple._1
-    def getProducedType: TypeInformation[Int] = createTypeInformation[Int]
-  }
-
-  private def tuple_1_long[K: ClassTag] = new KeySelector[(Long, _), Long] 
-                  with ResultTypeQueryable[Long] {
-    def getKey(tuple: Tuple2[Long, _]): Long = tuple._1
-    def getProducedType: TypeInformation[Long] = createTypeInformation[Long]
-  }
-
-  private def tuple_1_string[K: ClassTag] = new KeySelector[(String, _), String] 
-                  with ResultTypeQueryable[String] {
-    def getKey(tuple: Tuple2[String, _]): String = tuple._1
-    def getProducedType: TypeInformation[String] = createTypeInformation[String]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 96d57d2..84b327a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -19,13 +19,13 @@
 package org.apache.mahout.flinkbindings.drm
 
 import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
-import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
 import org.apache.mahout.flinkbindings.{DrmDataSet, _}
-import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable}
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm.CacheHint._
 import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _}
 import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
@@ -37,6 +37,7 @@ import scala.util.Random
 class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
       private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+      override val cacheHint: CacheHint = CacheHint.NONE,
       override protected[mahout] val partitioningTag: Long = Random.nextLong(),
       private var _canHaveMissingRows: Boolean = false
   ) extends CheckpointedDrm[K] {
@@ -79,7 +80,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+     this
+  }
 
   def collect: Matrix = {
     val data = ds.collect()
@@ -123,21 +127,76 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   def dfsWrite(path: String): Unit = {
     val env = ds.getExecutionEnvironment
 
-    val keyTag = implicitly[ClassTag[K]]
-    val convertKey = keyToWritableFunc(keyTag)
+    // ds.map is not picking up the correct runtime value of tuple._1
+    // WritableType info is throwing an exception
+    // when asserting that the key is not an actual Writable
+    // rather a subclass
 
-    val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] {
-      def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
-        case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
-      }
-    })
+//    val keyTag = implicitly[ClassTag[K]]
+//    def convertKey = keyToWritableFunc(keyTag)
+//    val writableDataset = ds.map {
+//      tuple => (convertKey(tuple._1), new VectorWritable(tuple._2))
+//    }
+
+
+      // test output with IntWritable Key.  VectorWritable is not a problem,
+//    val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+//      def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+//         (new IntWritable(1), new VectorWritable(tuple._2))
+//    })
+
+
+    val keyTag = implicitly[ClassTag[K]]
 
     val job = new JobConf
-    val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
     FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
 
-    val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-    writableDataset.output(hadoopOutput)
+    // explicitly define all Writable Subclasses for ds.map() keys
+    // as well as the SequenceFileOutputFormat paramaters
+    if (keyTag.runtimeClass == classOf[Int]) {
+      // explicitly map into Int keys
+      implicit val typeInformation = createTypeInformation[(IntWritable,VectorWritable)]
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+          (new IntWritable(tuple._1.asInstanceOf[Int]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for IntWritable
+      job.setOutputKeyClass(classOf[IntWritable])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[IntWritable, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+     } else if (keyTag.runtimeClass == classOf[String]) {
+      // explicitly map into Text keys
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (Text, VectorWritable) =
+          (new Text(tuple._1.asInstanceOf[String]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for Text
+      job.setOutputKeyClass(classOf[Text])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+    } else if (keyTag.runtimeClass == classOf[Long]) {
+      // explicitly map into Long keys
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) =
+          (new LongWritable(tuple._1.asInstanceOf[Long]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for LongWritable
+      job.setOutputKeyClass(classOf[LongWritable])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[LongWritable, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+    } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
 
     env.execute(s"dfsWrite($path)")
   }
@@ -148,9 +207,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     } else if (keyTag.runtimeClass == classOf[String]) {
       (x: K) => new Text(x.asInstanceOf[String]) 
     } else if (keyTag.runtimeClass == classOf[Long]) {
-      (x: K) => new LongWritable(x.asInstanceOf[Long]) 
-    } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { 
-      (x: K) => x.asInstanceOf[Writable] 
+      (x: K) => new LongWritable(x.asInstanceOf[Long])
+    // WritableTypeInfo will reject the base Writable class
+//          } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
+//      (x: K) => x.asInstanceOf[Writable]
     } else {
       throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
index a037d44..e65c43d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -31,5 +31,4 @@ class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
   /** Flink matrix customization exposure */
   def dataset = flinkDrm.ds
 
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index c9c1b2c..aea62fa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -18,18 +18,13 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext}
-import org.apache.mahout.math.drm.DrmTuple
 import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
 
-import scala.collection.JavaConverters.iterableAsScalaIterableConverter
 import scala.reflect.ClassTag
 
 trait FlinkDrm[K] {
@@ -43,7 +38,7 @@ trait FlinkDrm[K] {
   def classTag: ClassTag[K]
 }
 
-class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
 
   def executionEnvironment = ds.getExecutionEnvironment
   def context: FlinkDistributedContext = ds.getExecutionEnvironment
@@ -54,27 +49,27 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
     val ncolLocal = ncol
     val classTag = implicitly[ClassTag[K]]
 
-    val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], (Array[K], Matrix)] {
-      def mapPartition(values: Iterable[DrmTuple[K]], out: Collector[(Array[K], Matrix)]): Unit = {
-        val it = values.asScala.seq
+    val parts = ds.mapPartition {
+      values =>
+        val (keys, vectors) = values.toIterable.unzip
 
-        val (keys, vectors) = it.unzip
         if (vectors.nonEmpty) {
-          val isDense = vectors.head.isDense
-
-          if (isDense) {
+          val vector = vectors.head
+          val matrix: Matrix = if (vector.isDense) {
             val matrix = new DenseMatrix(vectors.size, ncolLocal)
             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
-            out.collect((keys.toArray(classTag), matrix))
+            matrix
           } else {
-            val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
-            out.collect((keys.toArray(classTag), matrix))
+            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
           }
+
+          Seq((keys.toArray(classTag), matrix))
+        } else {
+          Seq()
         }
-      }
-    })
+    }
 
-    new BlockifiedFlinkDrm(parts, ncol)
+    new BlockifiedFlinkDrm[K](parts, ncol)
   }
 
   def asRowWise = this
@@ -83,26 +78,31 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
 
 }
 
-class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+
 
   def executionEnvironment = ds.getExecutionEnvironment
   def context: FlinkDistributedContext = ds.getExecutionEnvironment
 
+
   def isBlockified = true
 
   def asBlockified = this
 
   def asRowWise = {
-    val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
-      def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match {
-        case (keys, block) => keys.view.zipWithIndex.foreach {
-          case (key, idx) =>
-            out.collect((key, block(idx, ::)))
+    val out = ds.flatMap {
+      tuple =>
+        val keys = tuple._1
+        val block = tuple._2
+
+        keys.view.zipWithIndex.map {
+          case (key, idx) => (key, block(idx, ::))
         }
-      }
-    })
-    new RowsFlinkDrm(out, ncol)
+    }
+
+    new RowsFlinkDrm[K](out, ncol)
   }
 
   def classTag = implicitly[ClassTag[K]]
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
index 24f298d..83ede9a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.io._
 import java.util.Arrays
 
 /**
- * Copied from /spark/src/main/scala/org/apache/mahout/common
+ * Flink DRM Metadata
  */
 class DrmMetadata(
 
@@ -40,13 +40,13 @@ class DrmMetadata(
    * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
    */
   val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match {
-    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
-    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
-    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
-    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
-    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
-    case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
-    case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+    case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _
+    case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _
+    case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _
+    case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _
+    case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _
+    case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _
+    case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _
     case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
index e77143e..b9d9f1b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -62,17 +62,17 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
     val seed = fs.getFileStatus(new Path(dir))
     var f: String = files
 
-    if (seed.isDir) {
+    if (seed.isDirectory) {
       val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
       for (fileStatus <- fileStatuses) {
         if (fileStatus.getPath().getName().matches(filePattern)
-          && !fileStatus.isDir) {
+          && !fileStatus.isDirectory) {
           // found a file
           if (fileStatus.getLen() != 0) {
             // file is not empty
             f = f + fileStatus.getPath.toUri.toString + ","
           }
-        } else if (fileStatus.isDir && recursive) {
+        } else if (fileStatus.isDirectory && recursive) {
           f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 6581721..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import org.apache.hadoop.io.{ Writable, SequenceFile }
-import org.apache.hadoop.fs.{ FileSystem, Path }
-import org.apache.hadoop.conf.Configuration
-import collection._
-import JavaConversions._
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- *
- * Copied from /spark/src/main/scala/org/apache/mahout/common
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-  /**
-   * Read the header of a sequence file and determine the Key and Value type
-   * @param path
-   * @return
-   */
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    val partFilePath: Path = fs.listStatus(dfsPath)
-
-      // Filter out anything starting with .
-      .filter { s =>
-        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
-      }
-
-      // Take path
-      .map(_.getPath)
-
-      // Take only one, if any
-      .headOption
-
-      // Require there's at least one partition file found.
-      .getOrElse {
-        throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
-      }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
-    } finally {
-      reader.close()
-    }
-
-  }
-
-  /**
-   * Delete a path from the filesystem
-   * @param path
-   */
-  def delete(path: String) {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    if (fs.exists(dfsPath)) {
-      fs.delete(dfsPath, true)
-    }
-  }
-
-}