You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/15 19:00:17 UTC

[1/2] mahout git commit: Major fixes for Flink backend merged

Repository: mahout
Updated Branches:
  refs/heads/flink-binding 92a2f6c8f -> 072289a46


http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..50d3bc6
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.io.{ Writable, SequenceFile }
+import org.apache.hadoop.fs.{ FileSystem, Path }
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+  /**
+   * Read the header of a sequence file and determine the Key and Value type
+   * @param path
+   * @return
+   */
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val conf = new Configuration()
+    val fs = dfsPath.getFileSystem(conf)
+
+    fs.setConf(conf)
+
+    val partFilePath: Path = fs.listStatus(dfsPath)
+
+      // Filter out anything starting with .
+      .filter { s =>
+        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
+      }
+
+      // Take path
+      .map(_.getPath)
+
+      // Take only one, if any
+      .headOption
+
+      // Require there's at least one partition file found.
+      .getOrElse {
+        throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+      }
+
+    // flink is retiring hadoop 1
+     val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+
+    // hadoop 2 reader
+//    val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf,
+//      SequenceFile.Reader.file(partFilePath));
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
+    } finally {
+      reader.close()
+    }
+
+  }
+
+  /**
+   * Delete a path from the filesystem
+   * @param path
+   */
+  def delete(path: String) {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    if (fs.exists(dfsPath)) {
+      fs.delete(dfsPath, true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 6b8f2ae..b083752 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,16 +18,15 @@
  */
 package org.apache.mahout
 
-import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
+import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrm, CheckpointedFlinkDrmOps, FlinkDrm, RowsFlinkDrm}
 import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _}
+import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
 import org.slf4j.LoggerFactory
 
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils._
-
 import scala.Array._
 import scala.reflect.ClassTag
 
@@ -44,7 +43,6 @@ package object flinkbindings {
    */
   type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]
 
-  
   implicit def wrapMahoutContext(context: DistributedContext): FlinkDistributedContext = {
     assert(context.isInstanceOf[FlinkDistributedContext], "it must be FlinkDistributedContext")
     context.asInstanceOf[FlinkDistributedContext]
@@ -62,7 +60,7 @@ package object flinkbindings {
     drm.asInstanceOf[CheckpointedFlinkDrm[K]]
   }
 
-  implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
+  implicit def checkpointedDrmToFlinkDrm[K: TypeInformation: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
     val flinkDrm = castCheckpointedDrm(cp)
     new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
   }
@@ -83,10 +81,8 @@ package object flinkbindings {
   def readCsv(file: String, delim: String = ",", comment: String = "#")
              (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
     val vectors = dc.env.readTextFile(file)
-      .filter(new FilterFunction[String] {
-        def filter(in: String): Boolean = {
-          !in.startsWith(comment)
-        }
+      .filter((in: String) => {
+        !in.startsWith(comment)
       })
       .map(new MapFunction[String, Vector] {
         def map(in: String): Vector = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
index 6fb71ea..41c7a6a 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
@@ -31,6 +31,8 @@ trait DistributedFlinkSuite extends DistributedMahoutSuite { this: Suite =>
 
   def initContext() {
     env = ExecutionEnvironment.getExecutionEnvironment
+    // set this higher so that tests like dsqDist(X,Y) have enough available slots to pass on a single machine.
+    env.setParallelism(10)
     mahoutCtx = wrapContext(env)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
index 83d7f43..725f31a 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
 
   test("norm") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
new file mode 100644
index 0000000..b834912
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+import org.apache.mahout.common.RandomUtils
+
+import scala.collection.immutable.List
+
+//import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import RLikeOps._
+import math._
+
+import org.apache.mahout.math.decompositions._
+import org.scalatest.{FunSuite, Matchers}
+
+
+import scala.reflect.ClassTag
+import org.apache.flink.api.scala._
+
+
+
+class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matchers {
+
+// // passing now
+//  test("Simple DataSet to IntWritable") {
+//    val path = TmpDir + "flinkOutput"
+//
+//    implicit val typeInfo = createTypeInformation[(Int,Int)]
+//    val ds = env.fromElements[(Int,Int)]((1,2),(3,4),(5,6),(7,8))
+//   // val job = new JobConf
+//
+//
+//    val writableDataset : DataSet[(IntWritable,IntWritable)] =
+//      ds.map( tuple =>
+//        (new IntWritable(tuple._1.asInstanceOf[Int]), new IntWritable(tuple._2.asInstanceOf[Int]))
+//    )
+//
+//    val job: Job = new Job()
+//
+//    job.setOutputKeyClass(classOf[IntWritable])
+//    job.setOutputValueClass(classOf[IntWritable])
+//
+//    // setup sink for IntWritable
+//    val sequenceFormat = new SequenceFileOutputFormat[IntWritable, IntWritable]
+//    val hadoopOutput  = new HadoopOutputFormat[IntWritable,IntWritable](sequenceFormat, job)
+//    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
+//
+//    writableDataset.output(hadoopOutput)
+//
+//    env.execute(s"dfsWrite($path)")
+//
+//  }
+
+
+  test("C = A + B, identically partitioned") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+
+     //   printf("A.nrow=%d.\n", A.rdd.count())
+
+    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+    val B = A.mapBlock() {
+      case (keys, block) =>
+        val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+        keys -> bBlock
+    }
+      // Prevent repeated computation non-determinism
+      // flink problem is here... checkpoint is not doing what it should
+      // ie. greate a physical plan w/o side effects
+      .checkpoint()
+
+    val inCoreB = B.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("B=\n%s\n", inCoreB)
+
+    val C = A + B
+
+    val inCoreC = C.collect
+
+    printf("C=\n%s\n", inCoreC)
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+  }
+//// Passing now.
+//  test("C = inCoreA %*%: B") {
+//
+//    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+//    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+//
+//    val B = drmParallelize(inCoreB, numPartitions = 2)
+//    val C = inCoreA %*%: B
+//
+//    val inCoreC = C.collect
+//    val inCoreCControl = inCoreA %*% inCoreB
+//
+//    println(inCoreC)
+//    (inCoreC - inCoreCControl).norm should be < 1E-10
+//
+//  }
+
+  test("dsqDist(X,Y)") {
+    val m = 100
+    val n = 300
+    val d = 7
+    val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5
+    val mxY = Matrices.symmetricUniformView(n, d, 1234).cloned += 10
+    val (drmX, drmY) = (drmParallelize(mxX, 3), drmParallelize(mxY, 4))
+
+    val mxDsq = dsqDist(drmX, drmY).collect
+    val mxDsqControl = new DenseMatrix(m, n) := { (r, c, _) ⇒ (mxX(r, ::) - mxY(c, ::)) ^= 2 sum }
+    (mxDsq - mxDsqControl).norm should be < 1e-7
+  }
+
+  test("dsqDist(X)") {
+    val m = 100
+    val d = 7
+    val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5
+    val drmX = drmParallelize(mxX, 3)
+
+    val mxDsq = dsqDist(drmX).collect
+    val mxDsqControl = sqDist(drmX)
+    (mxDsq - mxDsqControl).norm should be < 1e-7
+  }
+
+//// passing now
+//  test("DRM DFS i/o (local)") {
+//
+//    val uploadPath = TmpDir + "UploadedDRM"
+//
+//    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+//    val drmA = drmParallelize(inCoreA)
+//
+//    drmA.dfsWrite(path = uploadPath)
+//
+//    println(inCoreA)
+//
+//    // Load back from hdfs
+//    val drmB = drmDfsRead(path = uploadPath)
+//
+//    // Make sure keys are correctly identified as ints
+//    drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int
+//
+//    // Collect back into in-core
+//    val inCoreB = drmB.collect
+//
+//    // Print out to see what it is we collected:
+//    println(inCoreB)
+//
+//    (inCoreA - inCoreB).norm should be < 1e-7
+//  }
+
+
+
+  test("dspca") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+    val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+    // specific additions.
+    val k = 10
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
+    // Un-normalized pca data:
+    drmPCA = drmPCA %*% diagv(s)
+
+    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+  }
+
+  test("dals") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    val n = 500
+
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    // Create singluar values with decay
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    // Create A as an ideal input
+    val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
+      qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    // Decompose using ALS
+    val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple
+    val inCoreU = drmU.collect
+    val inCoreV = drmV.collect
+
+    val predict = inCoreU %*% inCoreV.t
+
+    printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
+    printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
+
+    val err = (inCoreA - predict).norm
+    printf("norm of residuals %f\n", err)
+    printf("train iteration rmses: %s\n", rmse)
+
+    err should be < 15e-2
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
index 6dcedd9..4aa524f 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
@@ -1,16 +1,9 @@
 package org.apache.mahout.flinkbindings
 
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
+import org.apache.mahout.math.scalabindings._
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
 
-@RunWith(classOf[JUnitRunner])
 class FlinkByteBCastSuite extends FunSuite {
 
   test("BCast vector") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
index 98318e3..3e14d76 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 
-@RunWith(classOf[JUnitRunner])
 class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
 
   val LOGGER = LoggerFactory.getLogger(getClass())

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
index 07d62dc..0a5f145 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory
 import org.scalatest.FunSuite
 import org.scalatest.junit.JUnitRunner
 
-@RunWith(classOf[JUnitRunner])
 class UseCasesSuite extends FunSuite with DistributedFlinkSuite {
 
   val LOGGER = LoggerFactory.getLogger(getClass())

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
index a766146..81ca737 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
@@ -23,15 +23,14 @@ import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
 import drm._
+import org.apache.flink.api.scala._
 import org.apache.mahout.flinkbindings._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 import org.apache.mahout.math.drm.logical._
 
-@RunWith(classOf[JUnitRunner])
 class LATestSuite extends FunSuite with DistributedFlinkSuite {
 
   test("Ax blockified") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
index f13597a..82ca3ff 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DistributedDecompositionsSuite extends FunSuite with DistributedFlinkSuite
       with DistributedDecompositionsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
index 7f6b2c8..325d118 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite
       with DrmLikeOpsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
index 7cfc48b..dfa7360 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeSuite extends FunSuite with DistributedFlinkSuite
       with DrmLikeSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
new file mode 100644
index 0000000..d6feed9
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
@@ -0,0 +1,11 @@
+package org.apache.mahout.flinkbindings.standard
+
+import org.apache.mahout.classifier.naivebayes.NBTestBase
+import org.apache.mahout.flinkbindings._
+import org.scalatest.FunSuite
+
+
+class NaiveBayesTestSuite extends FunSuite with DistributedFlinkSuite
+      with NBTestBase {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
index 1ba03b1..c0ff76c 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
@@ -1,18 +1,10 @@
 package org.apache.mahout.flinkbindings.standard
 
 import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class RLikeDrmOpsSuite extends FunSuite with DistributedFlinkSuite
       with RLikeDrmOpsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 0124612..deaadc4 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -125,7 +125,7 @@
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
-      <version>2.21</version>
+      <version>2.24.0</version>
     </dependency>
 
     <!--  3rd-party -->

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
index e8ac475..016171d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 /** Logical Times-left over in-core matrix operand */
 case class OpTimesLeftMatrix(
-    val left: Matrix,
+    left: Matrix,
     override var A: DrmLike[Int]
     ) extends AbstractUnaryOp[Int, Int] {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index ecb557b..34b1823 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -45,7 +45,7 @@ package object drm {
   //  type CacheHint = CacheHint.CacheHint
 
   def safeToNonNegInt(x: Long): Int = {
-    assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
+    assert(x == x << -31 >>> -31, "transformation from long to Int is losing significant bits, or is a negative number")
     x.toInt
   }
 
@@ -175,7 +175,7 @@ package object drm {
     import RLikeDrmOps._
 
     val drmAcp = drmA.checkpoint()
-    
+
     val mu = drmAcp colMeans
 
     // Compute variance using mean(x^2) - mean(x)^2

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
index d0fd393..d72d2f0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
@@ -24,7 +24,6 @@ import RLikeOps._
 import org.apache.mahout.logging._
 
 import scala.collection.JavaConversions._
-import scala.collection._
 
 object MMul extends MMBinaryFunc {
 
@@ -46,32 +45,32 @@ object MMul extends MMBinaryFunc {
         sd match {
 
           // Multiplication cases by a diagonal matrix.
-          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.COLWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
-          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSECOLWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
-          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.ROWWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
-          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSEROWWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
-
-          case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
-          case (TraversingStructureEnum.SPARSECOLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
-          case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
-          case (TraversingStructureEnum.SPARSEROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
+          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.COLWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagCW
+          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSECOLWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagCW
+          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.ROWWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagRW
+          case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSEROWWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagRW
+
+          case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmCWDiag
+          case (TraversingStructureEnum.SPARSECOLWISE, _, TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmCWDiag
+          case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmRWDiag
+          case (TraversingStructureEnum.SPARSEROWWISE, _, TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmRWDiag
 
           // Dense-dense cases
-          case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a eq b.t) ⇒ jvmDRWAAt _
-          case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a.t eq b) ⇒ jvmDRWAAt _
+          case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if a eq b.t ⇒ jvmDRWAAt
+          case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if a.t eq b ⇒ jvmDRWAAt
           case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) ⇒ jvmRWCW
           case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.ROWWISE, true) ⇒ jvmRWRW
           case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.COLWISE, true) ⇒ jvmCWCW
-          case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a eq b.t) ⇒ jvmDCWAAt _
-          case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a.t eq b) ⇒ jvmDCWAAt _
+          case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if a eq b.t ⇒ jvmDCWAAt
+          case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if a.t eq b ⇒ jvmDCWAAt
           case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) ⇒ jvmCWRW
 
           // Sparse row matrix x sparse row matrix (array of vectors)
@@ -107,7 +106,7 @@ object MMul extends MMBinaryFunc {
           case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseCWCW2flips
 
           // Sparse methods are only effective if the first argument is sparse, so we need to do a swap.
-          case (_, _, _, false) ⇒ { (a, b, r) ⇒ apply(b.t, a.t, r.map {_.t}).t }
+          case (_, _, _, false) ⇒ (a, b, r) ⇒ apply(b.t, a.t, r.map {_.t}).t
 
           // Default jvm-jvm case.
           case _ ⇒ jvmRWCW

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
index b288c62..de8228e 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
@@ -78,7 +78,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat
     printf("qControl2=\n%s\n", qControl2)
     printf("rControl2=\n%s\n", rControl2)
 
-    // Housholder approach seems to be a little bit more stable
+    // Householder approach seems to be a little bit more stable
     (rControl - inCoreR).norm should be < 1E-5
     (qControl - inCoreQ).norm should be < 1E-5
 
@@ -86,7 +86,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat
     (rControl2 - inCoreR).norm should be < 1E-10
     (qControl2 - inCoreQ).norm should be < 1E-10
 
-    // Assert orhtogonality:
+    // Assert orthogonality:
     // (a) Q[,j] dot Q[,j] == 1.0 for all j
     // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
     for (col <- 0 until inCoreQ.ncol)

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
index b46ee30..f18d23b 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -347,6 +347,9 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers {
         keys -> bBlock
     }
         // Prevent repeated computation non-determinism
+        // removing this checkpoint() will cause the same error in spark Tests
+        // as we're seeing in Flink with this test.  ie  util.Random.nextDouble()
+        // is being called more than once (note that it is not seeded in the closure)
         .checkpoint()
 
     val inCoreB = B.collect

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index 3869830..f6deb15 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -75,10 +75,10 @@ abstract class MahoutSparkDriver extends MahoutDriver {
   override protected def start() : Unit = {
     if (!_useExistingContext) {
       sparkConf.set("spark.kryo.referenceTracking", "false")
-        .set("spark.kryoserializer.buffer.mb", "200m")// this is default for Mahout optimizer, change it with -D option
+        .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
         // the previous has been marked deprecated as of Spark 1.4 by the below line,
         // remove the above line when Spark finally retires above for below
-        .set("spark.kryoserializer.buffer", "200m")
+        .set("spark.kryoserializer.buffer", "200")
 
 
       if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index b3a1ec2..fde37bf 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -26,17 +26,17 @@ class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(pr
     opts = opts + ("appName" -> programName)
     note("\nSpark config options:")
 
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
+    opt[String]("master") abbr "ma" text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
       "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
         options + ("master" -> x)
     }
 
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " +
+    opt[String]("sparkExecutorMem") abbr "sem" text ("Max Java heap available as \"executor memory\" on each " +
       "node (optional). Default: as Spark config specifies") action { (x, options) =>
         options + ("sparkExecutorMem" -> x)
     }
 
-    opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
+    opt[(String, String)]("define") abbr "D" unbounded() foreach { case (k, v) =>
       sparkConf.set(k, v)
     } validate { x =>
       if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 817c6ff..2cedc20 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -20,7 +20,6 @@ package org.apache.mahout.drivers
 import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
 import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSRead}
-import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
@@ -63,7 +62,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       opts = opts ++ RowSimilarityOptions
 
       note("\nAlgorithm control options:")
-      opt[Int]("maxObservations") abbr ("mo") action { (x, options) =>
+      opt[Int]("maxObservations") abbr "mo" action { (x, options) =>
         options + ("maxObservations" -> x)
       } text ("Max number of observations to consider per row (optional). Default: " +
         RowSimilarityOptions("maxObservations")) validate { x =>
@@ -96,7 +95,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
       parseGenericOptions()
 
-      help("help") abbr ("h") text ("prints this usage text\n")
+      help("help") abbr "h" text "prints this usage text\n"
 
     }
     parser.parse(args, parser.opts) map { opts =>

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index b5f76e0..d4f1aea 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -18,12 +18,12 @@
 package org.apache.mahout.drivers
 
 import org.apache.log4j.Logger
-import org.apache.mahout.math.indexeddataset._
-import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
-import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
-import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset._
 import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
+
 import scala.collection.JavaConversions._
 
 /**
@@ -269,7 +269,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
         val vector = if (sort) itemList.sortBy { elem => -elem._2 } else itemList
 
         // first get the external rowID token
-        if (!vector.isEmpty){
+        if (vector.nonEmpty){
           var line = rowIDDictionary_bcast.value.inverse.getOrElse(rowID, "INVALID_ROW_ID") + rowKeyDelim
           // for the rest of the row, construct the vector contents of elements (external column ID, strength value)
           for (item <- vector) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index e9f2f95..eeed97a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -17,12 +17,11 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.classifier.naivebayes._
-import org.apache.mahout.classifier.naivebayes.SparkNaiveBayes
+import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _}
 import org.apache.mahout.common.Hadoop1HDFSUtil
 import org.apache.mahout.math.drm
 import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.drm.RLikeDrmOps.drm2RLikeOps
+
 import scala.collection.immutable.HashMap
 
 
@@ -48,33 +47,33 @@ object TrainNBDriver extends MahoutSparkDriver {
 
       // default trainComplementary is false
       opts = opts + ("trainComplementary" -> false)
-      opt[Unit]("trainComplementary") abbr ("c") action { (_, options) =>
+      opt[Unit]("trainComplementary") abbr "c" action { (_, options) =>
         options + ("trainComplementary" -> true)
-      } text ("Train a complementary model, Default: false.")
+      } text "Train a complementary model, Default: false."
 
       // Laplace smoothing paramater default is 1.0
       opts = opts + ("alphaI" -> 1.0)
-      opt[Double]("alphaI") abbr ("a") action { (x, options) =>
+      opt[Double]("alphaI") abbr "a" action { (x, options) =>
         options + ("alphaI" -> x)
-      } text ("Laplace soothing factor default is 1.0") validate { x =>
+      } text "Laplace smothing factor default is 1.0" validate { x =>
         if (x > 0) success else failure("Option --alphaI must be > 0")
       }
 
       // Overwrite the output directory (with the model) if it exists?  Default: false
       opts = opts + ("overwrite" -> false)
-      opt[Unit]("overwrite") abbr ("ow") action { (_, options) =>
+      opt[Unit]("overwrite") abbr "ow" action { (_, options) =>
         options + ("overwrite" -> true)
-      } text ("Overwrite the output directory (with the model) if it exists? Default: false")
+      } text "Overwrite the output directory (with the model) if it exists? Default: false"
 
       // Spark config options--not driver specific
       parseSparkOptions()
 
-      help("help") abbr ("h") text ("prints this usage text\n")
+      help("help") abbr "h" text "prints this usage text\n"
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process()
+      process
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
index 4d13a5a..96ba8cd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext}
+import org.apache.mahout.math.drm.{DistributedEngine, DistributedContext}
 import org.apache.spark.SparkContext
 
 class SparkDistributedContext(val sc: SparkContext) extends DistributedContext {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index ffb164c..676b496 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -85,7 +85,7 @@ object ABt {
       s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol."
     )
 
-    // blockwise multimplication function
+    // blockwise multiplication function
     def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = {
       val (keysA, blockA) = tupleA
       val (keysB, blockB) = tupleB

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index a9dc874..4c75e75 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -40,8 +40,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
       // Do not run MAHOUT_HOME jars in unit tests.
       addMahoutJars = !isLocal,
       sparkConf = new SparkConf()
-          .set("spark.kryoserializer.buffer.mb", "40m")
-          .set("spark.kryoserializer.buffer", "40m")
+          .set("spark.kryoserializer.buffer.mb", "40")
+          .set("spark.kryoserializer.buffer", "40")
           .set("spark.akka.frameSize", "30")
           .set("spark.default.parallelism", "10")
           .set("spark.executor.memory", "2G")


[2/2] mahout git commit: Major fixes for Flink backend merged

Posted by sm...@apache.org.
Major fixes for Flink backend merged


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/072289a4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/072289a4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/072289a4

Branch: refs/heads/flink-binding
Commit: 072289a46c9bd4b7297a17b621f7da30b94df1a7
Parents: 92a2f6c
Author: smarthi <sm...@apache.org>
Authored: Tue Mar 15 13:57:35 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Mar 15 13:57:35 2016 -0400

----------------------------------------------------------------------
 flink/pom.xml                                   |  20 +-
 .../mahout/flinkbindings/FlinkByteBCast.scala   |   3 +
 .../flinkbindings/FlinkDistributedContext.scala |   1 +
 .../mahout/flinkbindings/FlinkEngine.scala      | 325 ++++++++++++-------
 .../mahout/flinkbindings/blas/FlinkOpAewB.scala |  58 ++--
 .../flinkbindings/blas/FlinkOpAewScalar.scala   |  33 +-
 .../mahout/flinkbindings/blas/FlinkOpAt.scala   |  58 ++--
 .../mahout/flinkbindings/blas/FlinkOpAtA.scala  |  48 ++-
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  |  25 +-
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   |   4 +-
 .../flinkbindings/blas/FlinkOpCBind.scala       | 136 ++++----
 .../flinkbindings/blas/FlinkOpMapBlock.scala    |  34 +-
 .../flinkbindings/blas/FlinkOpRBind.scala       |   5 +-
 .../flinkbindings/blas/FlinkOpRowRange.scala    |  20 +-
 .../blas/FlinkOpTimesRightMatrix.scala          |  54 ++-
 .../mahout/flinkbindings/blas/package.scala     |  60 ----
 .../drm/CheckpointedFlinkDrm.scala              |  92 +++++-
 .../drm/CheckpointedFlinkDrmOps.scala           |   1 -
 .../mahout/flinkbindings/drm/FlinkDrm.scala     |  58 ++--
 .../mahout/flinkbindings/io/DrmMetadata.scala   |  16 +-
 .../flinkbindings/io/HDFSPathSearch.scala       |   6 +-
 .../flinkbindings/io/Hadoop1HDFSUtil.scala      |  86 -----
 .../flinkbindings/io/Hadoop2HDFSUtil.scala      |  94 ++++++
 .../apache/mahout/flinkbindings/package.scala   |  22 +-
 .../flinkbindings/DistributedFlinkSuite.scala   |   2 +
 .../mahout/flinkbindings/DrmLikeOpsSuite.scala  |   1 -
 .../flinkbindings/FailingTestsSuite.scala       | 272 ++++++++++++++++
 .../flinkbindings/FlinkByteBCastSuite.scala     |   9 +-
 .../mahout/flinkbindings/RLikeOpsSuite.scala    |   1 -
 .../mahout/flinkbindings/UseCasesSuite.scala    |   1 -
 .../mahout/flinkbindings/blas/LATestSuite.scala |   3 +-
 .../DistributedDecompositionsSuite.scala        |   1 -
 .../standard/DrmLikeOpsSuite.scala              |   1 -
 .../flinkbindings/standard/DrmLikeSuite.scala   |   1 -
 .../standard/NaiveBayesTestSuite.scala          |  11 +
 .../standard/RLikeDrmOpsSuite.scala             |   8 -
 math-scala/pom.xml                              |   2 +-
 .../math/drm/logical/OpTimesLeftMatrix.scala    |   2 +-
 .../org/apache/mahout/math/drm/package.scala    |   4 +-
 .../apache/mahout/math/scalabindings/MMul.scala |  45 ++-
 .../DistributedDecompositionsSuiteBase.scala    |   4 +-
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  |   3 +
 .../mahout/drivers/MahoutSparkDriver.scala      |   4 +-
 .../drivers/MahoutSparkOptionParser.scala       |   6 +-
 .../mahout/drivers/RowSimilarityDriver.scala    |   5 +-
 .../drivers/TextDelimitedReaderWriter.scala     |  10 +-
 .../apache/mahout/drivers/TrainNBDriver.scala   |  21 +-
 .../sparkbindings/SparkDistributedContext.scala |   2 +-
 .../apache/mahout/sparkbindings/blas/ABt.scala  |   2 +-
 .../test/DistributedSparkSuite.scala            |   4 +-
 50 files changed, 1023 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 37f1dbf..2ccb558 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -103,14 +103,15 @@
   </build>
 
   <dependencies>
+
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
+      <artifactId>flink-runtime_${scala.compat.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_2.10</artifactId>
+      <artifactId>flink-scala_${scala.compat.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
@@ -120,15 +121,28 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_2.10</artifactId>
+      <artifactId>flink-core</artifactId>
       <version>${flink.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.compat.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-math-scala_${scala.compat.version}</artifactId>
     </dependency>
 
+    <!-- enforce current version of kryo as of 0.10.1-->
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.24.0</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-hdfs</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 8544db0..5cdfb79 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -43,14 +43,17 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri
     if (streamType == FlinkByteBCast.StreamTypeVector) {
       val writeable = new VectorWritable()
       writeable.readFields(stream)
+    //  printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
       writeable.get.asInstanceOf[T]
     } else if (streamType == FlinkByteBCast.StreamTypeMatrix) {
       val writeable = new MatrixWritable()
       writeable.readFields(stream)
+     // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
       writeable.get.asInstanceOf[T]
     } else {
       throw new IllegalArgumentException(s"unexpected type tag $streamType")
     }
+
   }
 
   override def value: T = _value

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index c818030..cfc9209 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -26,6 +26,7 @@ class FlinkDistributedContext(val env: ExecutionEnvironment) extends Distributed
 
   val engine: DistributedEngine = FlinkEngine
 
+
   override def close() {
     // TODO
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index d03aef7..f848c3f 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -18,32 +18,29 @@
  */
 package org.apache.mahout.flinkbindings
 
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.hadoop.io.Writable
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils.DataSetUtils
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
 import org.apache.mahout.flinkbindings.blas._
 import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.flinkbindings.io.HDFSUtil
-import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil
+import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil}
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.indexeddataset.BiDictionary
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.math.indexeddataset.Schema
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema}
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 
-import org.apache.flink.api.scala._
-
+import scala.collection.JavaConversions._
+import scala.reflect._
 
 object FlinkEngine extends DistributedEngine {
 
-  // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  // By default, use Hadoop 2 utils
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   /**
    * Load DRM from hdfs (as in Mahout DRM format).
@@ -51,7 +48,7 @@ object FlinkEngine extends DistributedEngine {
    * @param path The DFS path to load from
    * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
    */
-  override def drmDfsRead(path: String, parMin: Int = 0)
+  override def drmDfsRead(path: String, parMin: Int = 1)
                          (implicit dc: DistributedContext): CheckpointedDrm[_] = {
 
     // Require that context is actually Flink context.
@@ -60,19 +57,47 @@ object FlinkEngine extends DistributedEngine {
     // Extract the Flink Environment variable
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
-    val metadata = hdfsUtils.readDrmHeader(path)
+    // set the parallelism of the env to parMin
+    env.setParallelism(parMin)
 
-    val unwrapKey  = metadata.unwrapKeyFunction
+    // get the header of a SequenceFile in the path
+    val metadata = hdfsUtils.readDrmHeader(path + "//")
 
-    val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
+    val keyClass: Class[_] = metadata.keyTypeWritable
 
-    val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] {
-      def map(tuple: (Writable, VectorWritable)): (Any, Vector) = {
-        (unwrapKey(tuple._1), tuple._2)
-      }
-    })
+    // from the header determine which function to use to unwrap the key
+    val unwrapKey = metadata.unwrapKeyFunction
+
+    // Map to the correct DrmLike based on the metadata information
+    if (metadata.keyClassTag == ClassTag.Int) {
+      val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] {
+        def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else if (metadata.keyClassTag == ClassTag.Long) {
+      val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] {
+        def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else if (metadata.keyClassTag == ClassTag(classOf[String])) {
+      val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path)
+
+      val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] {
+        def map(tuple: (Text, VectorWritable)): (Any, Vector) = {
+          (unwrapKey(tuple._1), tuple._2.get())
+        }
+      })
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}")
 
-    datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
   }
 
   override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
@@ -82,89 +107,114 @@ object FlinkEngine extends DistributedEngine {
                                             (implicit sc: DistributedContext): IndexedDataset = ???
 
 
+  /**
+    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+    *
+    * A particular physical engine implementation may choose to either use or not use these rewrites
+    * as a useful basic rewriting rule.<P>
+    */
+  override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
   /** 
    * Translates logical plan into Flink execution plan. 
    **/
   override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
     // Flink-specific Physical Plan translation.
+
+    implicit val typeInformation = generateTypeInformation[K]
     val drm = flinkTranslate(plan)
     val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+   // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
     newcp.cache()
   }
 
-  private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match {
-    case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAtx(a, x) =>
-      // express Atx as (A.t) %*% x
-      // TODO: create specific implementation of Atx, see MAHOUT-1749
-      val opAt = OpAt(a)
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
-      val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-      val opAx = OpAx(atCast, x)
-      FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
-    case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), 
-        flinkTranslate(b)(op.classTagA))
-    case op @ OpABt(a, b) =>
-      // express ABt via AtB: let C=At and D=Bt, and calculate CtD
-      // TODO: create specific implementation of ABt, see MAHOUT-1750
-      val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
-      val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-
-      val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
-      val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
-
-      FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
-                .asInstanceOf[FlinkDrm[K]]
-    case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpTimesRightMatrix(a, b) => 
-      FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b)
-    case op @ OpAewUnaryFunc(a, _, _) =>
-      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpAewUnaryFuncFusion(a, _) => 
-      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
-    // deprecated
-    case op @ OpAewScalar(a, scalar, _) => 
-      FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar)
-    case op @ OpAewB(a, b, _) =>
-      FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpCbind(a, b) => 
-      FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpRbind(a, b) => 
-      FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
-    case op @ OpCbindScalar(a, x, _) => 
-      FlinkOpCBind.cbindScalar(op, flinkTranslate(a)(op.classTagA), x)
-    case op @ OpRowRange(a, _) => 
-      FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA))
-    case op @ OpABAnyKey(a, b) if extractRealClassTag(a) != extractRealClassTag(b) =>
-      throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
-    case op: OpMapBlock[K, _] => 
-      FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf)
-    case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
-    case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
+  private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
+    implicit val kTag = oper.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+    oper match {
+      case OpAtAnyKey(_) ⇒
+        throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+      case op@OpAx(a, x) ⇒
+        //implicit val typeInformation = generateTypeInformation[K]
+        FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
+      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
+        // express Atx as (A.t) %*% x
+        // TODO: create specific implementation of Atx, see MAHOUT-1749
+        val opAt = OpAt(a)
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
+        val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+        val opAx = OpAx(atCast, x)
+        FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
+        flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
+      case op@OpABt(a, b) ⇒
+        // express ABt via AtB: let C=At and D=Bt, and calculate CtD
+        // TODO: create specific implementation of ABt, see MAHOUT-1750
+        val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+        val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+        val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+        val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+        val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
+        FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
+      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpTimesRightMatrix(a, b) ⇒
+        FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b)
+      case op@OpAewUnaryFunc(a, _, _) ⇒
+        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒
+        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+      // deprecated
+      case op@OpAewScalar(a, scalar, _) ⇒
+        FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar)
+      case op@OpAewB(a, b, _) ⇒
+        FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpCbind(a, b) ⇒
+        FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpRbind(a, b) ⇒
+        FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b))
+      case op@OpCbindScalar(a, x, _) ⇒
+        FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x)
+      case op@OpRowRange(a, _) ⇒
+        FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+      case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag ⇒
+        throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
+      case op: OpMapBlock[K, _] ⇒
+        FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]]
+      case cp: CheckpointedFlinkDrm[K] ⇒
+        //implicit val ktag=cp.keyClassTag
+        new RowsFlinkDrm[K](cp.ds, cp.ncol)
+      case _ ⇒
+        throw new NotImplementedError(s"operator $oper is not implemented yet")
+    }
   }
 
   /** 
    * returns a vector that contains a column-wise sum from DRM 
    */
-  override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
-      def map(tuple: (K, Vector)): Vector = tuple._2
-    }).reduce(new ReduceFunction[Vector] {
-      def reduce(v1: Vector, v2: Vector) = v1 + v2
-    })
+  override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+
+    val sum = drm.ds.map {
+      tuple => tuple._2
+    }.reduce(_ + _)
 
     val list = sum.collect
     list.head
   }
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
-  override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
-    val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
-      def map(tuple: (Array[K], Matrix)): Vector = {
-        val (_, block) = tuple
+  override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+
+    val result = drm.asBlockified.ds.map {
+      tuple =>
+        val block = tuple._2
         val acc = block(0, ::).like()
 
         block.foreach { v =>
@@ -172,10 +222,7 @@ object FlinkEngine extends DistributedEngine {
         }
 
         acc
-      }
-    }).reduce(new ReduceFunction[Vector] {
-      def reduce(v1: Vector, v2: Vector) = v1 + v2
-    })
+    }.reduce(_ + _)
 
     val list = result.collect
     list.head
@@ -184,21 +231,30 @@ object FlinkEngine extends DistributedEngine {
   /** 
    * returns a vector that contains a column-wise mean from DRM 
    */
-  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
     drm.colSums() / drm.nrow
   }
 
   /**
    * Calculates the element-wise squared norm of a matrix
    */
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
-    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
-      def map(tuple: (K, Vector)): Double = tuple match {
+  override def norm[K](drm: CheckpointedDrm[K]): Double = {
+    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sumOfSquares = drm.ds.map {
+      tuple => tuple match {
         case (idx, vec) => vec dot vec
       }
-    }).reduce(new ReduceFunction[Double] {
-      def reduce(v1: Double, v2: Double) = v1 + v2
-    })
+    }.reduce(_ + _)
+
+//    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+//      def map(tuple: (K, Vector)): Double = tuple match {
+//        case (idx, vec) => vec dot vec
+//      }
+//    }).reduce(new ReduceFunction[Double] {
+//      def reduce(v1: Double, v2: Double) = v1 + v2
+//    })
 
     val list = sumOfSquares.collect
     list.head
@@ -214,7 +270,7 @@ object FlinkEngine extends DistributedEngine {
     FlinkByteBCast.wrap(m)
 
 
-  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                            (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
 
@@ -223,14 +279,19 @@ object FlinkEngine extends DistributedEngine {
     new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
   }
 
+
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
       (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
     val dataSetType = TypeExtractor.getForObject(rows.head)
-    dc.env.fromCollection(rows).setParallelism(parallelismDegree)
+    //TODO: Make Sure that this is the correct partitioning scheme
+    dc.env.fromCollection(rows)
+            .partitionByRange(0)
+            .setParallelism(parallelismDegree)
+            .rebalance()
   }
 
-  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
   override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
                                           (implicit dc: DistributedContext): CheckpointedDrm[String] = {
     ???
@@ -247,7 +308,7 @@ object FlinkEngine extends DistributedEngine {
       for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
     }
     val result = dc.env.fromCollection(nonParallelResult)
-    new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
+    new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol)
   }
 
   /** Creates empty DRM with non-trivial height */
@@ -259,29 +320,53 @@ object FlinkEngine extends DistributedEngine {
    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
    * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
    */
-  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): 
+  def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false):
           (DrmLike[Int], Option[DrmLike[K]]) = ???
 
   /**
-   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+   * (Optional) Sampling operation.
    */
-  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ???
-
-  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
-
-//  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
-//
-//    val ncol = drmX match {
-//      case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
-//      case _ ⇒ -1
-//    }
-//
-//    val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
-//
-//  }
+  def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = {
+    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction)
+    new CheckpointedFlinkDrm[K](sample)
+  }
+
+  def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
+    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
+
+    val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples)
+    new CheckpointedFlinkDrm[K](sample)
+  }
 
   /** Optional engine-specific all reduce tensor operation. */
-  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = 
+  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix =
     throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink")
- 
+
+//  private def generateTypeInformation[K]: TypeInformation[K] = {
+//    val tag = implicitly[K].asInstanceOf[ClassTag[K]]
+//    generateTypeInformationFromTag(tag)
+//  }
+  private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
+    val tag = implicitly[ClassTag[K]]
+
+    generateTypeInformationFromTag(tag)
+  }
+
+  private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
+    if (tag.runtimeClass.equals(classOf[Int])) {
+      createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
+    } else if (tag.runtimeClass.equals(classOf[Long])) {
+      createTypeInformation[Long].asInstanceOf[TypeInformation[K]]
+    } else if (tag.runtimeClass.equals(classOf[String])) {
+      createTypeInformation[String].asInstanceOf[TypeInformation[K]]
+//    } else if (tag.runtimeClass.equals(classOf[Any])) {
+//       createTypeInformation[Any].asInstanceOf[TypeInformation[K]]
+    } else {
+      throw new IllegalArgumentException(s"index type $tag is not supported")
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index f879e86..c61074b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -1,56 +1,42 @@
 package org.apache.mahout.flinkbindings.blas
 
-import java.lang.Iterable
 
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
 import org.apache.mahout.math.Vector
 import org.apache.mahout.math.drm.logical.OpAewB
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
-import com.google.common.collect.Lists
-
 /**
  * Implementation is inspired by Spark-binding's OpAewB
  * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) 
  */
 object FlinkOpAewB {
 
-  def rowWiseJoinNoSideEffect[K: ClassTag](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     val function = AewBOpsCloning.strToFunction(op.op)
 
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
-
-    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
-    val res: DataSet[(Any, Vector)] = 
-      rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
-        .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
-      def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], 
-                  out: Collector[(_, Vector)]): Unit = {
-        val it1 = Lists.newArrayList(it1java).asScala
-        val it2 = Lists.newArrayList(it2java).asScala
-
-        if (it1.nonEmpty && it2.nonEmpty) {
-          val (idx, a) = it1.head
-          val (_, b) = it2.head
-          out.collect((idx, function(a, b)))
-        } else if (it1.isEmpty && it2.nonEmpty) {
-          out.collect(it2.head)
-        } else if (it1.nonEmpty && it2.isEmpty) {
-          out.collect(it1.head)
-        }
+    val rowsA = A.asRowWise.ds
+    val rowsB = B.asRowWise.ds
+    implicit val kTag = op.keyClassTag
+
+    val res: DataSet[(K, Vector)] =
+      rowsA
+        .coGroup(rowsB)
+        .where(0)
+        .equalTo(0) {
+        (left, right, out: Collector[(K, Vector)]) =>
+          (left.toIterable.headOption, right.toIterable.headOption) match {
+            case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b)))
+            case (None, Some(b)) => out.collect(b)
+            case (Some(a), None) => out.collect(a)
+            case (None, None) => throw new RuntimeException("At least one side of the co group " +
+              "must be non-empty.")
+          }
       }
-    })
+
 
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index 67d710b..56e7deb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -19,6 +19,7 @@
 package org.apache.mahout.flinkbindings.blas
 
 import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.Matrix
 import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc}
@@ -39,43 +40,45 @@ object FlinkOpAewScalar {
   private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
 
   @Deprecated
-  def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
+  def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
     val function = EWOpsCloning.strToFunction(op.op)
+    implicit val kTag = op.keyClassTag
 
-    val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-      def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
-        case (keys, mat) => (keys, function(mat, scalar))
-      }
-    })
+
+    val res = A.asBlockified.ds.map{
+      tuple => (tuple._1, function(tuple._2, scalar))
+    }
 
     new BlockifiedFlinkDrm(res, op.ncol)
   }
 
-  def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
+  def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
     val f = op.f
     val inplace = isInplace
 
+
+    implicit val kTag = op.keyClassTag
+
     val res = if (op.evalZeros) {
-      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+      A.asBlockified.ds.map{
+        tuple =>
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned
           newBlock := ((_, _, x) => f(x))
           (keys, newBlock)
-        }
-      })
+      }
     } else {
-      A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+      A.asBlockified.ds.map{
+        tuple =>
           val (keys, block) = tuple
           val newBlock = if (inplace) block else block.cloned
           for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
           (keys, newBlock)
-        }
-      })
+      }
     }
 
     new BlockifiedFlinkDrm(res, op.ncol)
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index e515b34..6e320af 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -18,25 +18,13 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import java.lang.Iterable
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConverters.asScalaBufferConverter
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.shaded.com.google.common.collect.Lists
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SequentialAccessSparseVector
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.DrmTuple
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{SequentialAccessSparseVector, Vector}
 import org.apache.mahout.math.drm.logical.OpAt
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
-import org.apache.flink.api.scala._
+import scala.Array.canBuildFrom
 
 /**
  * Implementation is taken from Spark's At
@@ -52,34 +40,34 @@ object FlinkOpAt {
   def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
     val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
 
-    val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
-      def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
-        case (keys, block) =>
-          (0 until block.ncol).map(columnIdx => {
+    val sparseParts = A.asBlockified.ds.flatMap {
+      blockifiedTuple =>
+        val keys = blockifiedTuple._1
+        val block = blockifiedTuple._2
+
+        (0 until block.ncol).map {
+          columnIndex =>
             val columnVector: Vector = new SequentialAccessSparseVector(ncol)
 
-            keys.zipWithIndex.foreach { case (key, idx) =>
-              columnVector(key) = block(idx, columnIdx)
+            keys.zipWithIndex.foreach {
+              case (key, idx) => columnVector(key) = block(idx, columnIndex)
             }
 
-            out.collect((columnIdx, columnVector))
-          })
-      }
-    })
+            (columnIndex, columnVector)
+        }
+    }
 
-    val regrouped = sparseParts.groupBy(selector[Vector, Int])
+    val regrouped = sparseParts.groupBy(0)
 
-    val sparseTotal = regrouped.reduceGroup(new GroupReduceFunction[(Int, Vector), DrmTuple[Int]] {
-      def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = {
-        val it = Lists.newArrayList(values).asScala
-        val (idx, _) = it.head
-        val vector = (it map { case (idx, vec) => vec }).sum
-        out.collect((idx, vector))
-      }
-    })
+    val sparseTotal = regrouped.reduce{
+      (left, right) =>
+        (left._1, left._2 + right._2)
+    }
 
     // TODO: densify or not?
     new RowsFlinkDrm(sparseTotal, ncol)
   }
 
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 629857a..bdb0e5e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
 
-import scala.collection.JavaConverters._
-
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.shaded.com.google.common.collect.Lists
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
 import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 
+import scala.collection.JavaConverters._
 
 /**
  * Inspired by Spark's implementation from 
@@ -29,61 +27,59 @@ object FlinkOpAtA {
   final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
   final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200"
 
-  def at_a(op: OpAtA[_], A: FlinkDrm[_]): FlinkDrm[Int] = {
+  def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
     val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, PROPERTY_ATA_MAXINMEMNCOL_DEFAULT)
     val maxInMemNCol = maxInMemStr.toInt
     maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
 
+    implicit val kTag = A.classTag
+
     if (op.ncol <= maxInMemNCol) {
       implicit val ctx = A.context
       val inCoreAtA = slim(op, A)
       val result = drmParallelize(inCoreAtA, numPartitions = 1)
       result
     } else {
-      fat(op.asInstanceOf[OpAtA[Any]], A.asInstanceOf[FlinkDrm[Any]])
+      fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]])
     }
   }
 
-  def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = {
-    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
+  def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
+    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]]
 
-    val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] {
+    val res = ds.map {
       // TODO: optimize it: use upper-triangle matrices like in Spark
-      def map(block: (Array[Any], Matrix)): Matrix =  block match {
-        case (idx, m) => m.t %*% m
-      }
-    }).reduce(new ReduceFunction[Matrix] {
-      def reduce(m1: Matrix, m2: Matrix) = m1 + m2
-    }).collect()
+      block => block._2.t %*% block._2
+    }.reduce(_ + _).collect()
 
     res.head
   }
 
-  def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
+  def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
     val nrow = op.A.nrow
     val ncol = op.A.ncol
     val ds = A.asBlockified.ds
 
-    val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] {
-      def map(a: (Array[Any], Matrix)): Int = 1
+    val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], Matrix), Int] {
+      def map(a: (Array[K], Matrix)): Int = 1
     }).reduce(new ReduceFunction[Int] {
       def reduce(a: Int, b: Int): Int = a + b
     })
 
-    val subresults: DataSet[(Int, Matrix)] = 
-          ds.flatMap(new RichFlatMapFunction[(Array[Any], Matrix), (Int, Matrix)] {
+    val subresults: DataSet[(Int, Matrix)] =
+          ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, Matrix)] {
 
       var ranges: Array[Range] = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
         val parts = dsX.get(0)
         val numParts = estimatePartitions(nrow, ncol, parts)
         ranges = computeEvenSplits(ncol, numParts)
       }
 
-      def flatMap(tuple: (Array[Any], Matrix), out: Collector[(Int, Matrix)]): Unit = {
+      def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): Unit = {
         val block = tuple._2
 
         ranges.zipWithIndex.foreach { case (range, idx) => 
@@ -93,13 +89,13 @@ object FlinkOpAtA {
 
     }).withBroadcastSet(numberOfPartitions, "numberOfPartitions")
 
-    val res = subresults.groupBy(selector[Matrix, Int])
+    val res = subresults.groupBy(0)
                         .reduceGroup(new RichGroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
 
       var ranges: Array[Range] = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
         val parts = dsX.get(0)
         val numParts = estimatePartitions(nrow, ncol, parts)

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index b514868..6a081ba 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -46,25 +46,22 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpAtB {
 
-  def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = {
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
+  def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = {
 
-    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner)
+    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+    val joined = rowsAt.join(rowsB).where(0).equalTo(0)
 
     val ncol = op.ncol
     val nrow = op.nrow.toInt
     val blockHeight = 10
     val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1)
 
-    val preProduct: DataSet[(Int, Matrix)] = 
-             joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] {
-      def flatMap(in: Tuple2[(_, Vector), (_, Vector)],
-                  out: Collector[(Int, Matrix)]): Unit = {
+    val preProduct: DataSet[(Int, Matrix)] =
+             joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), (Int, Matrix)] {
+      def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, Matrix)]): Unit = {
         val avec = in._1._2
-        val bvec = in._1._2
+        val bvec = in._2._2
 
         0.until(blockCount) map { blockKey =>
           val blockStart = blockKey * blockHeight
@@ -72,13 +69,13 @@ object FlinkOpAtB {
 
           val outer = avec(blockStart until blockEnd) cross bvec
           out.collect(blockKey -> outer)
+          out
         }
       }
     })
 
     val res: BlockifiedDrmDataSet[Int] = 
-      preProduct.groupBy(selector[Matrix, Int])
-                .reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
+      preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
       def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = {
         val it = Lists.newArrayList(values).asScala
         val (idx, _) = it.head
@@ -90,7 +87,7 @@ object FlinkOpAtB {
       }
     })
 
-    new BlockifiedFlinkDrm(res, ncol)
+    new BlockifiedFlinkDrm[Int](res, ncol)
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 4302457..79f5fe8 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -21,6 +21,7 @@ package org.apache.mahout.flinkbindings.blas
 import java.util.List
 
 import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
 import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.{Matrix, Vector}
@@ -38,8 +39,9 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpAx {
 
-  def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
+  def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
     implicit val ctx = A.context
+    implicit val kTag = op.keyClassTag
 
     val singletonDataSetX = ctx.env.fromElements(op.x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 6cf5e5c..65b2a25 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -19,12 +19,14 @@
 package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
@@ -43,94 +45,86 @@ import org.apache.mahout.math.scalabindings._
  */
 object FlinkOpCBind {
 
-  def cbind[K: ClassTag](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = op.B.ncol
 
-    val classTag = extractRealClassTag(op.A)
-    val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) 
-
-    val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
-    val res: DataSet[(Any, Vector)] = 
-      rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
-        .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
-      def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], 
-                  out: Collector[(_, Vector)]): Unit = {
-        val it1 = Lists.newArrayList(it1java).asScala
-        val it2 = Lists.newArrayList(it2java).asScala
-
-        if (it1.nonEmpty && it2.nonEmpty) {
-          val (idx, a) = it1.head
-          val (_, b) = it2.head
-
-          val result: Vector = if (a.isDense && b.isDense) {
-            new DenseVector(n) 
-          } else {
-            new SequentialAccessSparseVector(n)
-          }
-
-          result(0 until n1) := a
-          result(n1 until n) := b
-
-          out.collect((idx, result))
-        } else if (it1.isEmpty && it2.nonEmpty) {
-          val (idx, b) = it2.head
-          val result: Vector = if (b.isDense) { 
-            new DenseVector(n)
-          } else {
-            new SequentialAccessSparseVector(n)
-          }
-          result(n1 until n) := b
-          out.collect((idx, result))
-        } else if (it1.nonEmpty && it2.isEmpty) {
-          val (idx, a) = it1.head
-          val result: Vector = if (a.isDense) {
-            new DenseVector(n)
-          } else {
-            new SequentialAccessSparseVector(n)
+    implicit val classTag = op.A.keyClassTag
+
+    val rowsA = A.asRowWise.ds
+    val rowsB = B.asRowWise.ds
+
+    val res: DataSet[(K, Vector)] =
+      rowsA.coGroup(rowsB).where(0).equalTo(0) {
+        (left, right) =>
+          (left.toIterable.headOption, right.toIterable.headOption) match {
+            case (Some((idx, a)), Some((_, b))) =>
+              val result = if (a.isDense && b.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+
+              result(0 until n1) := a
+              result(n1 until n) := b
+
+              (idx, result)
+            case (Some((idx, a)), None) =>
+              val result: Vector = if (a.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+              result(n1 until n) := a
+
+              (idx, result)
+            case (None, Some((idx, b))) =>
+              val result: Vector = if (b.isDense) {
+                new DenseVector(n)
+              } else {
+                new SequentialAccessSparseVector(n)
+              }
+              result(n1 until n) := b
+
+              (idx, result)
+            case (None, None) =>
+              throw new RuntimeException("CoGroup should have at least one non-empty input.")
           }
-          result(n1 until n) := a
-          out.collect((idx, result))
-        }
       }
-    })
 
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
   }
 
-  def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
+  def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
     val left = op.leftBind
     val ds = A.asBlockified.ds
 
-    val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
-      def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
-        case (keys, mat) => (keys, cbind(mat, x, left))
-      }
+    implicit val kTag= op.keyClassTag
 
-      def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
-        val ncol = mat.ncol
-        val newMat = mat.like(mat.nrow, ncol + 1)
+    def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
+      val ncol = mat.ncol
+      val newMat = mat.like(mat.nrow, ncol + 1)
 
-        if (left) {
-          newMat.zip(mat).foreach { case (newVec, origVec) =>
-            newVec(0) = x
-            newVec(1 to ncol) := origVec
-          }
-        } else {
-          newMat.zip(mat).foreach { case (newVec, origVec) =>
-            newVec(ncol) = x
-            newVec(0 to (ncol - 1)) := origVec
-          }
+      if (left) {
+        newMat.zip(mat).foreach { case (newVec, origVec) =>
+          newVec(0) = x
+          newVec(1 to ncol) := origVec
+        }
+      } else {
+        newMat.zip(mat).foreach { case (newVec, origVec) =>
+          newVec(ncol) = x
+          newVec(0 to (ncol - 1)) := origVec
         }
-
-        newMat
       }
-    })
+
+      newMat
+    }
+
+    val out = A.asBlockified.ds.map {
+      tuple => (tuple._1, cbind(tuple._2, x, left))
+    }
 
     new BlockifiedFlinkDrm(out, op.ncol)
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index 9530d43..c3918a5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -18,13 +18,10 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.drm.logical.OpMapBlock
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
 /**
@@ -33,16 +30,19 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
  */
 object FlinkOpMapBlock {
 
-  def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {
-    val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
-      def map(block: (Array[S], Matrix)): (Array[R], Matrix) =  {
-        val out = function(block)
-        assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
-        assert(out._2.ncol == ncol, s"block map must return $ncol number of columns.")
-        out
-      }
-    })
+  def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = {
+    implicit val rtag = operator.keyClassTag
+    val bmf = operator.bmf
+    val ncol = operator.ncol
+    val res = src.asBlockified.ds.map {
+      block =>
+        val result = bmf(block)
+        assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
+        assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.")
+       // printf("Block partition: \n%s\n", block._2)
+        result
+    }
 
-    new BlockifiedFlinkDrm(res, ncol)
+    new BlockifiedFlinkDrm[R](res, ncol)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 83beaa1..4fa2eaa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -18,6 +18,8 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
 import scala.reflect.ClassTag
 
 import org.apache.flink.api.scala.DataSet
@@ -28,8 +30,9 @@ import org.apache.mahout.math.drm.logical.OpRbind
 
 object FlinkOpRBind {
 
-  def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+  def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
     // note that indexes of B are already re-arranged prior to executing this code
+    implicit val kTag = op.keyClassTag
     val res = A.asRowWise.ds.union(B.asRowWise.ds)
     new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index 6e11892..39f4ceb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -18,11 +18,9 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Vector
 import org.apache.mahout.math.drm.logical.OpRowRange
 
 /**
@@ -35,17 +33,13 @@ object FlinkOpRowRange {
     val rowRange = op.rowRange
     val firstIdx = rowRange.head
 
-    val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] {
-      def filter(tuple: (Int, Vector)): Boolean = tuple match {
-        case (idx, vec) => rowRange.contains(idx)
-      }
-    })
+    val filtered = A.asRowWise.ds.filter {
+      tuple => rowRange.contains(tuple._1)
+    }
 
-    val res = filtered.map(new MapFunction[(Int, Vector), (Int, Vector)] {
-      def map(tuple: (Int, Vector)): (Int, Vector) = tuple match {
-        case (idx, vec) => (idx - firstIdx, vec)
-      }
-    })
+    val res = filtered.map {
+      tuple => (tuple._1 - firstIdx, tuple._2)
+    }
 
     new RowsFlinkDrm(res, op.ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index 989fad1..4e5b1a7 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -18,17 +18,16 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import org.apache.flink.api.scala._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{DenseMatrix, Matrix}
 
 /**
  * Implementation is taken from Spark's OpTimesRightMatrix:
@@ -36,20 +35,51 @@ import org.apache.flink.api.scala._
  */
 object FlinkOpTimesRightMatrix {
 
-  def drmTimesInCore[K: ClassTag](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
+  def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
     implicit val ctx = A.context
+    implicit val kTag = op.keyClassTag
+
+
 
-    val singletonDataSetB = ctx.env.fromElements(inCoreB)
+    /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo
+     * Issue resulkting in a stackOverflow error.
+     * 
+     * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end
+     * 
+     * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself.
+     */
+    
+    //  val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+    
+    //  val inCoreBcastB = FlinkEngine.drmBroadcast(inCoreB)
+    //  val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+    val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
+    val dataSetType = TypeExtractor.getForObject(rows.head)
+    val singletonDataSetB = ctx.env.fromCollection(rows)
 
     val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
       var inCoreB: Matrix = null
 
       override def open(params: Configuration): Unit = {
         val runtime = this.getRuntimeContext
-        val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix")
-        inCoreB = dsB.get(0)
-      }
+        //val dsB: java.util.List[Matrix]
+        val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = runtime.getBroadcastVariable("matrix")
+        val m = dsB.size()
+        val n = dsB.get(0)._2.size
+        val isDense = dsB.get(0)._2.isDense
 
+        inCoreB = isDense match {
+          case true => new DenseMatrix(m, n)
+          case false => new DenseMatrix(m, n)
+        }
+        for (i <- 0 until m) {
+          inCoreB(i, ::) := dsB.get(i)._2
+        }
+
+      }
+     
       override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
         case (keys, block_A) => (keys, block_A %*% inCoreB)
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
deleted file mode 100644
index 27f552c..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import scala.reflect.ClassTag
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-package object blas {
-
-  // TODO: remove it once figure out how to make Flink accept interfaces (Vector here)
-  def selector[V, K: ClassTag]: KeySelector[(K, V), K] = {
-    val tag = implicitly[ClassTag[K]]
-    if (tag.runtimeClass.equals(classOf[Int])) {
-      tuple_1_int.asInstanceOf[KeySelector[(K, V), K]]
-    } else if (tag.runtimeClass.equals(classOf[Long])) {
-      tuple_1_long.asInstanceOf[KeySelector[(K, V), K]]
-    } else if (tag.runtimeClass.equals(classOf[String])) {
-      tuple_1_string.asInstanceOf[KeySelector[(K, V), K]]
-    } else {
-      throw new IllegalArgumentException(s"index type $tag is not supported")
-    }
-  }
-
-  private def tuple_1_int[K: ClassTag] = new KeySelector[(Int, _), Int] 
-                  with ResultTypeQueryable[Int] {
-    def getKey(tuple: Tuple2[Int, _]): Int = tuple._1
-    def getProducedType: TypeInformation[Int] = createTypeInformation[Int]
-  }
-
-  private def tuple_1_long[K: ClassTag] = new KeySelector[(Long, _), Long] 
-                  with ResultTypeQueryable[Long] {
-    def getKey(tuple: Tuple2[Long, _]): Long = tuple._1
-    def getProducedType: TypeInformation[Long] = createTypeInformation[Long]
-  }
-
-  private def tuple_1_string[K: ClassTag] = new KeySelector[(String, _), String] 
-                  with ResultTypeQueryable[String] {
-    def getKey(tuple: Tuple2[String, _]): String = tuple._1
-    def getProducedType: TypeInformation[String] = createTypeInformation[String]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 96d57d2..84b327a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -19,13 +19,13 @@
 package org.apache.mahout.flinkbindings.drm
 
 import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
-import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
 import org.apache.mahout.flinkbindings.{DrmDataSet, _}
-import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable}
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm.CacheHint._
 import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _}
 import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
@@ -37,6 +37,7 @@ import scala.util.Random
 class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
       private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+      override val cacheHint: CacheHint = CacheHint.NONE,
       override protected[mahout] val partitioningTag: Long = Random.nextLong(),
       private var _canHaveMissingRows: Boolean = false
   ) extends CheckpointedDrm[K] {
@@ -79,7 +80,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+     this
+  }
 
   def collect: Matrix = {
     val data = ds.collect()
@@ -123,21 +127,76 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   def dfsWrite(path: String): Unit = {
     val env = ds.getExecutionEnvironment
 
-    val keyTag = implicitly[ClassTag[K]]
-    val convertKey = keyToWritableFunc(keyTag)
+    // ds.map is not picking up the correct runtime value of tuple._1
+    // WritableType info is throwing an exception
+    // when asserting that the key is not an actual Writable
+    // rather a subclass
 
-    val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] {
-      def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
-        case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
-      }
-    })
+//    val keyTag = implicitly[ClassTag[K]]
+//    def convertKey = keyToWritableFunc(keyTag)
+//    val writableDataset = ds.map {
+//      tuple => (convertKey(tuple._1), new VectorWritable(tuple._2))
+//    }
+
+
+      // test output with IntWritable Key.  VectorWritable is not a problem,
+//    val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+//      def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+//         (new IntWritable(1), new VectorWritable(tuple._2))
+//    })
+
+
+    val keyTag = implicitly[ClassTag[K]]
 
     val job = new JobConf
-    val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
     FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
 
-    val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-    writableDataset.output(hadoopOutput)
+    // explicitly define all Writable Subclasses for ds.map() keys
+    // as well as the SequenceFileOutputFormat paramaters
+    if (keyTag.runtimeClass == classOf[Int]) {
+      // explicitly map into Int keys
+      implicit val typeInformation = createTypeInformation[(IntWritable,VectorWritable)]
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+          (new IntWritable(tuple._1.asInstanceOf[Int]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for IntWritable
+      job.setOutputKeyClass(classOf[IntWritable])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[IntWritable, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+     } else if (keyTag.runtimeClass == classOf[String]) {
+      // explicitly map into Text keys
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (Text, VectorWritable) =
+          (new Text(tuple._1.asInstanceOf[String]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for Text
+      job.setOutputKeyClass(classOf[Text])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+    } else if (keyTag.runtimeClass == classOf[Long]) {
+      // explicitly map into Long keys
+      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, VectorWritable)] {
+        def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) =
+          (new LongWritable(tuple._1.asInstanceOf[Long]), new VectorWritable(tuple._2))
+      })
+
+      // setup sink for LongWritable
+      job.setOutputKeyClass(classOf[LongWritable])
+      job.setOutputValueClass(classOf[VectorWritable])
+      val sequenceFormat = new SequenceFileOutputFormat[LongWritable, VectorWritable]
+      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
+      writableDataset.output(hadoopOutput)
+
+    } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
 
     env.execute(s"dfsWrite($path)")
   }
@@ -148,9 +207,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     } else if (keyTag.runtimeClass == classOf[String]) {
       (x: K) => new Text(x.asInstanceOf[String]) 
     } else if (keyTag.runtimeClass == classOf[Long]) {
-      (x: K) => new LongWritable(x.asInstanceOf[Long]) 
-    } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { 
-      (x: K) => x.asInstanceOf[Writable] 
+      (x: K) => new LongWritable(x.asInstanceOf[Long])
+    // WritableTypeInfo will reject the base Writable class
+//          } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
+//      (x: K) => x.asInstanceOf[Writable]
     } else {
       throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
index a037d44..e65c43d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -31,5 +31,4 @@ class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
   /** Flink matrix customization exposure */
   def dataset = flinkDrm.ds
 
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index c9c1b2c..aea62fa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -18,18 +18,13 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext}
-import org.apache.mahout.math.drm.DrmTuple
 import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
 
-import scala.collection.JavaConverters.iterableAsScalaIterableConverter
 import scala.reflect.ClassTag
 
 trait FlinkDrm[K] {
@@ -43,7 +38,7 @@ trait FlinkDrm[K] {
   def classTag: ClassTag[K]
 }
 
-class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
 
   def executionEnvironment = ds.getExecutionEnvironment
   def context: FlinkDistributedContext = ds.getExecutionEnvironment
@@ -54,27 +49,27 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
     val ncolLocal = ncol
     val classTag = implicitly[ClassTag[K]]
 
-    val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], (Array[K], Matrix)] {
-      def mapPartition(values: Iterable[DrmTuple[K]], out: Collector[(Array[K], Matrix)]): Unit = {
-        val it = values.asScala.seq
+    val parts = ds.mapPartition {
+      values =>
+        val (keys, vectors) = values.toIterable.unzip
 
-        val (keys, vectors) = it.unzip
         if (vectors.nonEmpty) {
-          val isDense = vectors.head.isDense
-
-          if (isDense) {
+          val vector = vectors.head
+          val matrix: Matrix = if (vector.isDense) {
             val matrix = new DenseMatrix(vectors.size, ncolLocal)
             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
-            out.collect((keys.toArray(classTag), matrix))
+            matrix
           } else {
-            val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
-            out.collect((keys.toArray(classTag), matrix))
+            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
           }
+
+          Seq((keys.toArray(classTag), matrix))
+        } else {
+          Seq()
         }
-      }
-    })
+    }
 
-    new BlockifiedFlinkDrm(parts, ncol)
+    new BlockifiedFlinkDrm[K](parts, ncol)
   }
 
   def asRowWise = this
@@ -83,26 +78,31 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
 
 }
 
-class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+
 
   def executionEnvironment = ds.getExecutionEnvironment
   def context: FlinkDistributedContext = ds.getExecutionEnvironment
 
+
   def isBlockified = true
 
   def asBlockified = this
 
   def asRowWise = {
-    val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
-      def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match {
-        case (keys, block) => keys.view.zipWithIndex.foreach {
-          case (key, idx) =>
-            out.collect((key, block(idx, ::)))
+    val out = ds.flatMap {
+      tuple =>
+        val keys = tuple._1
+        val block = tuple._2
+
+        keys.view.zipWithIndex.map {
+          case (key, idx) => (key, block(idx, ::))
         }
-      }
-    })
-    new RowsFlinkDrm(out, ncol)
+    }
+
+    new RowsFlinkDrm[K](out, ncol)
   }
 
   def classTag = implicitly[ClassTag[K]]
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
index 24f298d..83ede9a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.io._
 import java.util.Arrays
 
 /**
- * Copied from /spark/src/main/scala/org/apache/mahout/common
+ * Flink DRM Metadata
  */
 class DrmMetadata(
 
@@ -40,13 +40,13 @@ class DrmMetadata(
    * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
    */
   val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match {
-    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
-    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
-    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
-    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
-    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
-    case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
-    case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+    case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _
+    case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _
+    case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _
+    case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _
+    case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _
+    case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _
+    case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _
     case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
index e77143e..b9d9f1b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -62,17 +62,17 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
     val seed = fs.getFileStatus(new Path(dir))
     var f: String = files
 
-    if (seed.isDir) {
+    if (seed.isDirectory) {
       val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
       for (fileStatus <- fileStatuses) {
         if (fileStatus.getPath().getName().matches(filePattern)
-          && !fileStatus.isDir) {
+          && !fileStatus.isDirectory) {
           // found a file
           if (fileStatus.getLen() != 0) {
             // file is not empty
             f = f + fileStatus.getPath.toUri.toString + ","
           }
-        } else if (fileStatus.isDir && recursive) {
+        } else if (fileStatus.isDirectory && recursive) {
           f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 6581721..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import org.apache.hadoop.io.{ Writable, SequenceFile }
-import org.apache.hadoop.fs.{ FileSystem, Path }
-import org.apache.hadoop.conf.Configuration
-import collection._
-import JavaConversions._
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- *
- * Copied from /spark/src/main/scala/org/apache/mahout/common
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-  /**
-   * Read the header of a sequence file and determine the Key and Value type
-   * @param path
-   * @return
-   */
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    val partFilePath: Path = fs.listStatus(dfsPath)
-
-      // Filter out anything starting with .
-      .filter { s =>
-        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
-      }
-
-      // Take path
-      .map(_.getPath)
-
-      // Take only one, if any
-      .headOption
-
-      // Require there's at least one partition file found.
-      .getOrElse {
-        throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
-      }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
-    } finally {
-      reader.close()
-    }
-
-  }
-
-  /**
-   * Delete a path from the filesystem
-   * @param path
-   */
-  def delete(path: String) {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    if (fs.exists(dfsPath)) {
-      fs.delete(dfsPath, true)
-    }
-  }
-
-}