You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2016/03/08 05:37:04 UTC

[1/2] mahout git commit: MAHOUT-1800: Pare down Casstag overuse closes apache/mahout#183

Repository: mahout
Updated Branches:
  refs/heads/master 57317a51a -> 6919fd9fe


http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 42ddceb..99412df 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -24,7 +24,7 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSparkOps, cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
 import scala.Predef
 import scala.reflect.ClassTag
@@ -46,7 +46,7 @@ object SparkEngine extends DistributedEngine {
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
-  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def colSums[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -56,7 +56,7 @@ object SparkEngine extends DistributedEngine {
 
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
-      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) ⇒  acc += v)
+      val acc = ((new DenseVector(n): Vector) /: iter) ((acc, v) ⇒ acc += v)
       Iterator(acc)
     })
 
@@ -65,7 +65,7 @@ object SparkEngine extends DistributedEngine {
       .reduce(_ += _)
   }
 
-  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -76,7 +76,7 @@ object SparkEngine extends DistributedEngine {
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
       val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) ⇒
-        v.nonZeroes().foreach { elem ⇒  acc(elem.index) += 1}
+        v.nonZeroes().foreach { elem ⇒ acc(elem.index) += 1 }
         acc
       }
       Iterator(acc)
@@ -87,10 +87,10 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
+  override def colMeans[K](drm: CheckpointedDrm[K]): Vector =
     if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
 
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+  override def norm[K](drm: CheckpointedDrm[K]): Double =
     drm.rdd
       // Compute sum of squares of each vector
       .map {
@@ -100,7 +100,7 @@ object SparkEngine extends DistributedEngine {
 
 
   /** Optional engine-specific all reduce tensor operation. */
-  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
+  override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
   BlockReduceFunc): Matrix = {
 
     import drm._
@@ -108,11 +108,11 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
-   *
-   * A particular physical engine implementation may choose to either use or not use these rewrites
-   * as a useful basic rewriting rule.<P>
-   */
+    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+    *
+    * A particular physical engine implementation may choose to either use or not use these rewrites
+    * as a useful basic rewriting rule.<P>
+    */
   override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
 
 
@@ -139,14 +139,13 @@ object SparkEngine extends DistributedEngine {
   def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
 
   /**
-   * Load DRM from hdfs (as in Mahout DRM format)
-   *
-   * @param path
-   * @param sc spark context (wanted to make that implicit, doesn't work in current version of
-   *           scala with the type bounds, sorry)
-   *
-   * @return DRM[Any] where Any is automatically translated to value type
-   */
+    * Load DRM from hdfs (as in Mahout DRM format)
+    *
+    * @param path
+    * @param sc spark context (wanted to make that implicit, doesn't work in current version of
+    *           scala with the type bounds, sorry)
+    * @return DRM[Any] where Any is automatically translated to value type
+    */
   def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
 
     // Require that context is actually Spark context.
@@ -163,7 +162,7 @@ object SparkEngine extends DistributedEngine {
     val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
 
       // Immediately convert keys and value writables into value types.
-      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get()}
+      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get() }
 
     // Wrap into a DRM type with correct matrix row key class tag evident.
     drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
@@ -221,11 +220,12 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
-   * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
-   */
-  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = {
-    if (classTag[K] == ClassTag.Int) {
+    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
+    * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
+    */
+  override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = {
+    implicit val ktag = drmX.keyClassTag
+    if (ktag == ClassTag.Int) {
       drmX.asInstanceOf[DrmLike[Int]] → None
     } else {
 
@@ -237,26 +237,29 @@ object SparkEngine extends DistributedEngine {
       val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap)
 
       // Convert computed key mapping to a matrix.
-      val mxKeyMap = keyMap.map { rdd =>
-        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal):Vector)}, ncol = 1, nrow = nrow)
+      val mxKeyMap = keyMap.map { rdd ⇒
+        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal): Vector) }, ncol = 1, nrow = nrow)
       }
 
 
       drmWrap(rdd = intRdd, ncol = ncol) → mxKeyMap
-  }
+    }
 
   }
 
 
   /**
-   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
-   * @param drmX
-   * @param fraction
-   * @param replacement
-   * @tparam K
-   * @return
-   */
-  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = {
+    * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+    *
+    * @param drmX
+    * @param fraction
+    * @param replacement
+    * @tparam K
+    * @return
+    */
+  override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = {
+
+    implicit val ktag = drmX.keyClassTag
 
     // We do want to take ncol if already computed, if not, then we don't want to trigger computation
     // here.
@@ -265,14 +268,14 @@ object SparkEngine extends DistributedEngine {
       case _ ⇒ -1
     }
     val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction)
-    if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol)
+    if (ktag != ClassTag.Int) return drmWrap(sample, ncol = ncol)
 
     // K == Int: Int-keyed sample. rebase int counts.
     drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]]
   }
 
 
-  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = {
+  override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = {
 
     val ncol = drmX match {
       case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
@@ -286,9 +289,9 @@ object SparkEngine extends DistributedEngine {
     val isSparse = sample.exists { case (_, vec) ⇒ !vec.isDense }
 
     val vectors = sample.map(_._2)
-    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString → (idx:Integer) }.toMap
+    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString → (idx: Integer) }.toMap
 
-    val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors)
+    val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors)
     mx.setRowLabelBindings(labels)
 
     mx
@@ -301,7 +304,7 @@ object SparkEngine extends DistributedEngine {
     case CacheHint.MEMORY_ONLY ⇒ StorageLevel.MEMORY_ONLY
     case CacheHint.MEMORY_ONLY_2 ⇒ StorageLevel.MEMORY_ONLY_2
     case CacheHint.MEMORY_ONLY_SER ⇒ StorageLevel.MEMORY_ONLY_SER
-      case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
     case CacheHint.MEMORY_AND_DISK ⇒ StorageLevel.MEMORY_AND_DISK
     case CacheHint.MEMORY_AND_DISK_2 ⇒ StorageLevel.MEMORY_AND_DISK_2
     case CacheHint.MEMORY_AND_DISK_SER ⇒ StorageLevel.MEMORY_AND_DISK_SER
@@ -309,7 +312,7 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Translate previously optimized physical plan */
-  private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+  private def tr2phys[K](oper: DrmLike[K]): DrmRddInput[K] = {
     // I do explicit evidence propagation here since matching via case classes seems to be loosing
     // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
     // Hence you see explicit evidence attached to all recursive exec() calls.
@@ -319,28 +322,32 @@ object SparkEngine extends DistributedEngine {
       // (we cannot do actual flip for non-int-keyed arguments)
       case OpAtAnyKey(_) ⇒
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
-      case op@OpAt(a) ⇒ At.at(op, tr2phys(a)(op.classTagA))
-      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtA(a) ⇒ AtA.at_a(op, tr2phys(a)(op.classTagA))
-      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAtx(a, x) ⇒ Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
-      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA))
-      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpRowRange(a, _) ⇒ Slicing.rowRange(op, tr2phys(a)(op.classTagA))
-      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ At.at(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a), tr2phys(b))
+      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a), tr2phys(b)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ AtA.at_a(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a))
+      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
+        Ax.atx_with_broadcast(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
+      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a), tr2phys(b))
+      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a), tr2phys(b))
+      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a))
+      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a), tr2phys(b))
+      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a), s)
+      case op@OpRowRange(a, _) if op.keyClassTag == ClassTag.Int ⇒
+        Slicing.rowRange(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a))
       // Custom operators, we just execute them
-      case blockOp: OpMapBlock[K, _] ⇒ MapBlock.exec(
-        src = tr2phys(blockOp.A)(blockOp.classTagA),
+      case blockOp: OpMapBlock[_, K] ⇒ MapBlock.exec(
+        src = tr2phys(blockOp.A),
         operator = blockOp
       )
-      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a)(op.classTagA))
-      case cp: CheckpointedDrm[K] ⇒ cp.rdd: DrmRddInput[K]
+      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a))
+      case cp: CheckpointedDrm[K] ⇒
+        implicit val ktag=cp.keyClassTag
+        cp.rdd: DrmRddInput[K]
       case _ ⇒ throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
         .format(oper))
 
@@ -348,32 +355,34 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
-   * delimited files. Reads a vector per row.
-   * @param src a comma separated list of URIs to read from
-   * @param schema how the text file is formatted
-   */
+    * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+    * delimited files. Reads a vector per row.
+    *
+    * @param src    a comma separated list of URIs to read from
+    * @param schema how the text file is formatted
+    */
   def indexedDatasetDFSRead(src: String,
-      schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetSpark = {
+                            schema: Schema = DefaultIndexedDatasetReadSchema,
+                            existingRowIDs: Option[BiDictionary] = None)
+                           (implicit sc: DistributedContext):
+  IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readRowsFrom(src, existingRowIDs)
     ids
   }
 
   /**
-   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
-   * delimited files. Reads an element per row.
-   * @param src a comma separated list of URIs to read from
-   * @param schema how the text file is formatted
-   */
+    * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+    * delimited files. Reads an element per row.
+    *
+    * @param src    a comma separated list of URIs to read from
+    * @param schema how the text file is formatted
+    */
   def indexedDatasetDFSReadElements(src: String,
-      schema: Schema = DefaultIndexedDatasetElementReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetSpark = {
+                                    schema: Schema = DefaultIndexedDatasetElementReadSchema,
+                                    existingRowIDs: Option[BiDictionary] = None)
+                                   (implicit sc: DistributedContext):
+  IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readElementsFrom(src, existingRowIDs)
     ids

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 11e2bad..5142d3b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -44,13 +44,13 @@ object ABt {
    * @param srcB B source RDD 
    * @tparam K
    */
-  def abt[K: ClassTag](
+  def abt[K](
       operator: OpABt[K],
       srcA: DrmRddInput[K],
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     debug("operator AB'(Spark)")
-    abt_nograph(operator, srcA, srcB)
+    abt_nograph(operator, srcA, srcB)(operator.keyClassTag)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 8a90398..d8637d2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,20 +17,18 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
-import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import org.apache.mahout.logging._
 import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
-import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB}
-import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
-import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
 import org.apache.mahout.math.drm._
-import org.apache.mahout.logging._
-import collection._
-import JavaConversions._
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewB, OpAewScalar, TEwFunc}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFunc, ReduceFuncScalar}
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
+
+import scala.reflect.{ClassTag, classTag}
+import scala.collection.JavaConversions._
 
 /** Elementwise drm-drm operators */
 object AewB {
@@ -53,7 +51,9 @@ object AewB {
 
 
   /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
-  def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+  def a_ew_b[K](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+
+    implicit val ktag = op.keyClassTag
 
     val ewOps = getEWOps()
     val opId = op.op
@@ -111,15 +111,16 @@ object AewB {
     rdd
   }
 
-  def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = {
+  def a_ew_func[K](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = {
 
     val evalZeros = op.evalZeros
     val inplace = ewInplace()
     val f = op.f
+    implicit val ktag = op.keyClassTag
 
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
-    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
+    val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
@@ -149,12 +150,13 @@ object AewB {
   }
 
   /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
-  def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
+  def a_ew_scalar[K](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
   DrmRddInput[K] = {
 
 
     val ewOps = getEWOps()
     val opId = op.op
+    implicit val ktag = op.keyClassTag
 
     val reduceFunc = opId match {
       case "+" => ewOps.plusScalar
@@ -168,7 +170,7 @@ object AewB {
 
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing 
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
-    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) {
+    val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 5f9f84a..6fe076e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -17,7 +17,10 @@ object AinCoreB {
 
   private final implicit val log = getLog(AinCoreB.getClass)
 
-  def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
+  def rightMultiply[K](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
+
+    implicit val ktag = op.keyClassTag
+
     if ( op.right.isInstanceOf[DiagonalMatrix])
       rightMultiply_diag(op, srcA)
     else

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 629accd..42e56e7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -13,10 +13,11 @@ import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
 /** Matrix product with one of operands an in-core matrix */
 object Ax {
 
-  def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
+  def ax_with_broadcast[K](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val sc: DistributedContext = rddA.sparkContext
+    implicit val ktag = op.keyClassTag
 
     val bcastX = drmBroadcast(op.x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index 4a379ec..e900749 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -32,7 +32,9 @@ object CbindAB {
 
   private val log = Logger.getLogger(CbindAB.getClass)
 
-  def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
+  def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
+
+    implicit val ktag = op.keyClassTag
     val srcRdd = srcA.toDrmRdd()
 
     val ncol = op.A.ncol
@@ -60,13 +62,14 @@ object CbindAB {
     resultRdd
   }
 
-  def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+  def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
 
     val a = srcA.toDrmRdd()
     val b = srcB.toDrmRdd()
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = n - n1
+    implicit val ktag = op.keyClassTag
 
     // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
     // instead of join, and apply the op map-side. Otherwise, perform join and apply the op

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 4cd9a74..6104d83 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -23,7 +23,7 @@ import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
 import org.apache.mahout.sparkbindings.DrmRdd
 
-class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
+class DrmRddOps[K](private[blas] val rdd: DrmRdd[K]) {
 
   /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */
   def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 2933ddc..7e48ed8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -25,13 +25,14 @@ import scala.reflect.ClassTag
 
 object MapBlock {
 
-  def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = {
+  def exec[S, R](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = {
 
     // We can't use attributes directly in the closure in order to avoid putting the whole object
     // into closure.
     val bmf = operator.bmf
     val ncol = operator.ncol
-    val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+    implicit val rtag = operator.keyClassTag
+    src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
@@ -39,8 +40,6 @@ object MapBlock {
 
       out
     })
-
-    rdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index 0434a72..7e32b69 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -15,8 +15,9 @@ object Par {
 
   private final implicit val log = getLog(Par.getClass)
 
-  def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
+  def exec[K](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
 
+    implicit val ktag = op.keyClassTag
     val srcBlockified = src.isBlockified
 
     val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 62abba6..14772f6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -27,7 +27,9 @@ object RbindAB {
 
   private val log = Logger.getLogger(RbindAB.getClass)
 
-  def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+  def rbindAB[K](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+
+    implicit val ktag = op.keyClassTag
 
     // If any of the inputs is blockified, use blockified inputs
     if (srcA.isBlockified || srcB.isBlockified) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 6b8513f..5a83f80 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -35,7 +35,7 @@ import JavaConversions._
  */
 package object blas {
 
-  implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
+  implicit def drmRdd2ops[K](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
 
 
   /**
@@ -46,7 +46,7 @@ package object blas {
    * @tparam K existing key parameter
    * @return
    */
-  private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int],
+  private[mahout] def rekeySeqInts[K](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int],
     Option[RDD[(K, Int)]]) = {
 
     // Spark context please.

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index abcfc64..3c086fe 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,16 +1,17 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.sparkbindings.DrmRdd
 import scala.reflect.ClassTag
 
 /** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */
-class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) {
 
   assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
 
   private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
 
   /** Spark matrix customization exposure */
-  def rdd = sparkDrm.rddInput.toDrmRdd()
+  def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd()
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index e18d6da..b793098 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -37,15 +37,15 @@ package object drm {
 
   private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings");
 
-  private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrmSpark[K]): DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrm2DrmRddInput[K](cp: CheckpointedDrmSpark[K]): DrmRddInput[K] =
     cp.rddInput
 
-  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K](cp: CheckpointedDrm[K]): DrmRddInput[K] =
     cp.asInstanceOf[CheckpointedDrmSpark[K]]
 
-  private[sparkbindings] implicit def drmRdd2drmRddInput[K: ClassTag](rdd: DrmRdd[K]) = new DrmRddInput[K](Left(rdd))
+  private[sparkbindings] implicit def drmRdd2drmRddInput[K:ClassTag](rdd: DrmRdd[K]) = new DrmRddInput[K](Left(rdd))
 
-  private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K: ClassTag](rdd: BlockifiedDrmRdd[K]) = new
+  private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K:ClassTag](rdd: BlockifiedDrmRdd[K]) = new
       DrmRddInput[K](
     Right(rdd))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 330ae38..de309c3 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -108,10 +108,10 @@ package object sparkbindings {
   implicit def sb2bc[T](b: Broadcast[T]): BCast[T] = new SparkBCast(b)
 
   /** Adding Spark-specific ops */
-  implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] =
+  implicit def cpDrm2cpDrmSparkOps[K](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] =
     new CheckpointedDrmSparkOps[K](drm)
 
-  implicit def drm2cpDrmSparkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K]
+  implicit def drm2cpDrmSparkOps[K](drm: DrmLike[K]): CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K]
 
   private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new MatrixWritable(m)
 


[2/2] mahout git commit: MAHOUT-1800: Pare down Casstag overuse closes apache/mahout#183

Posted by ap...@apache.org.
MAHOUT-1800: Pare down Casstag overuse closes apache/mahout#183


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/6919fd9f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/6919fd9f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/6919fd9f

Branch: refs/heads/master
Commit: 6919fd9febe1585d15e78e51aabcad8fa29235f3
Parents: 57317a5
Author: Andrew Palumbo <ap...@apache.org>
Authored: Mon Mar 7 23:10:37 2016 -0500
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Mon Mar 7 23:35:37 2016 -0500

----------------------------------------------------------------------
 .../apache/mahout/h2obindings/H2OEngine.scala   |  54 +++---
 .../classifier/naivebayes/NaiveBayes.scala      |   9 +-
 .../apache/mahout/math/decompositions/ALS.scala |   7 +-
 .../apache/mahout/math/decompositions/DQR.scala |   5 +-
 .../mahout/math/decompositions/DSPCA.scala      |   5 +-
 .../mahout/math/decompositions/DSSVD.scala      |  11 +-
 .../mahout/math/drm/CheckpointedDrm.scala       |   7 -
 .../mahout/math/drm/CheckpointedOps.scala       |   2 +-
 .../mahout/math/drm/DistributedEngine.scala     |  58 ++++---
 .../org/apache/mahout/math/drm/DrmLike.scala    |   8 +
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |   6 +-
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  19 ++-
 .../math/drm/logical/AbstractBinaryOp.scala     |  39 ++---
 .../math/drm/logical/AbstractUnaryOp.scala      |   6 +-
 .../math/drm/logical/CheckpointAction.scala     |   3 +-
 .../apache/mahout/math/drm/logical/OpAB.scala   |   8 +-
 .../mahout/math/drm/logical/OpABAnyKey.scala    |   9 +-
 .../apache/mahout/math/drm/logical/OpABt.scala  |   8 +-
 .../apache/mahout/math/drm/logical/OpAewB.scala |  10 +-
 .../mahout/math/drm/logical/OpAewScalar.scala   |   8 +-
 .../math/drm/logical/OpAewUnaryFunc.scala       |   8 +-
 .../math/drm/logical/OpAewUnaryFuncFusion.scala |   8 +-
 .../apache/mahout/math/drm/logical/OpAt.scala   |   8 +
 .../apache/mahout/math/drm/logical/OpAtA.scala  |   8 +-
 .../mahout/math/drm/logical/OpAtAnyKey.scala    |   8 +-
 .../apache/mahout/math/drm/logical/OpAtB.scala  |   8 +-
 .../apache/mahout/math/drm/logical/OpAtx.scala  |   8 +
 .../apache/mahout/math/drm/logical/OpAx.scala   |   8 +-
 .../mahout/math/drm/logical/OpCbind.scala       |   9 +-
 .../mahout/math/drm/logical/OpCbindScalar.scala |   8 +-
 .../mahout/math/drm/logical/OpMapBlock.scala    |  13 +-
 .../apache/mahout/math/drm/logical/OpPar.scala  |   8 +-
 .../mahout/math/drm/logical/OpRbind.scala       |   9 +-
 .../mahout/math/drm/logical/OpRowRange.scala    |   8 +
 .../math/drm/logical/OpTimesLeftMatrix.scala    |   8 +
 .../math/drm/logical/OpTimesRightMatrix.scala   |   8 +-
 .../org/apache/mahout/math/drm/package.scala    |  28 +--
 .../mahout/math/drm/DrmLikeOpsSuiteBase.scala   |  20 ++-
 .../classifier/naivebayes/SparkNaiveBayes.scala |   2 +-
 .../apache/mahout/drivers/TrainNBDriver.scala   |   4 +-
 .../mahout/sparkbindings/SparkEngine.scala      | 171 ++++++++++---------
 .../apache/mahout/sparkbindings/blas/ABt.scala  |   4 +-
 .../apache/mahout/sparkbindings/blas/AewB.scala |  36 ++--
 .../mahout/sparkbindings/blas/AinCoreB.scala    |   5 +-
 .../apache/mahout/sparkbindings/blas/Ax.scala   |   3 +-
 .../mahout/sparkbindings/blas/CbindAB.scala     |   7 +-
 .../mahout/sparkbindings/blas/DrmRddOps.scala   |   2 +-
 .../mahout/sparkbindings/blas/MapBlock.scala    |   7 +-
 .../apache/mahout/sparkbindings/blas/Par.scala  |   3 +-
 .../mahout/sparkbindings/blas/RbindAB.scala     |   4 +-
 .../mahout/sparkbindings/blas/package.scala     |   4 +-
 .../drm/CheckpointedDrmSparkOps.scala           |   5 +-
 .../mahout/sparkbindings/drm/package.scala      |   8 +-
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 54 files changed, 460 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index bcf3507..5567f84 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -36,16 +36,16 @@ object H2OEngine extends DistributedEngine {
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
-  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+  def colMeans[K](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.colMeans(drm.h2odrm.frame)
 
-  def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+  def colSums[K](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.colSums(drm.h2odrm.frame)
 
-  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+  def norm[K](drm: CheckpointedDrm[K]): Double =
     H2OHelper.sumSqr(drm.h2odrm.frame)
 
-  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
+  def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.nonZeroCnt(drm.h2odrm.frame)
 
   /** Broadcast support */
@@ -94,33 +94,33 @@ object H2OEngine extends DistributedEngine {
       case OpAtAnyKey(_) =>
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
       // Linear algebra operators
-      case op@OpAt(a) => At.exec(tr2phys(a)(op.classTagA))
-      case op@OpABt(a, b) => ABt.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtB(a, b) => AtB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtA(a) => AtA.exec(tr2phys(a)(op.classTagA))
-      case op@OpAx(a, v) => Ax.exec(tr2phys(a)(op.classTagA), v)
-      case op@OpAtx(a, v) => Atx.exec(tr2phys(a)(op.classTagA), v)
-      case op@OpAewUnaryFunc(a, f, z) => AewUnary.exec(tr2phys(a)(op.classTagA), op.f, z)
-      case op@OpAewUnaryFuncFusion(a, f) => AewUnary.exec(tr2phys(a)(op.classTagA), op.f, op.evalZeros)
-      case op@OpAewB(a, b, opId) => AewB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB), opId)
-      case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(op.classTagA), s, opId)
-      case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(op.classTagA), m)
+      case op@OpAt(a) => At.exec(tr2phys(a)(a.keyClassTag))
+      case op@OpABt(a, b) => ABt.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag))
+      case op@OpAtB(a, b) => AtB.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag))
+      case op@OpAtA(a) => AtA.exec(tr2phys(a)(a.keyClassTag))
+      case op@OpAx(a, v) => Ax.exec(tr2phys(a)(a.keyClassTag), v)
+      case op@OpAtx(a, v) => Atx.exec(tr2phys(a)(a.keyClassTag), v)
+      case op@OpAewUnaryFunc(a, f, z) => AewUnary.exec(tr2phys(a)(a.keyClassTag), op.f, z)
+      case op@OpAewUnaryFuncFusion(a, f) => AewUnary.exec(tr2phys(a)(a.keyClassTag), op.f, op.evalZeros)
+      case op@OpAewB(a, b, opId) => AewB.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag), opId)
+      case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(a.keyClassTag), s, opId)
+      case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(a.keyClassTag), m)
       // Non arithmetic
-      case op@OpCbind(a, b) => Cbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbindScalar(a, d, left) => CbindScalar.exec(tr2phys(a)(op.classTagA), d, left)
-      case op@OpRbind(a, b) => Rbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r)
+      case op@OpCbind(a, b) => Cbind.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag))
+      case op@OpCbindScalar(a, d, left) => CbindScalar.exec(tr2phys(a)(a.keyClassTag), d, left)
+      case op@OpRbind(a, b) => Rbind.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag))
+      case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(a.keyClassTag), r)
       // Custom operators
-      case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf,
-        (blockOp.classTagK == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.classTagK)
-      case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e)
+      case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.A.keyClassTag), blockOp.ncol, blockOp.bmf,
+        (blockOp.keyClassTag == classTag[String]), blockOp.A.keyClassTag, blockOp.keyClassTag)
+      case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(a.keyClassTag), m, e)
       case cp: CheckpointedDrm[K] => cp.h2odrm
       case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
           .format(oper))
     }
   }
 
-  implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
+  implicit def cp2cph2o[K](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
 
   /** stub class not implemented in H2O */
   abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, val columnIDs: BiDictionary)
@@ -167,23 +167,23 @@ object H2OEngine extends DistributedEngine {
    * TODO: implement this please.
    *
    */
-  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc)
+  override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc)
   : Matrix = H2OHelper.allreduceBlock(drm.h2odrm, bmf, rf)
 
   /**
    * TODO: implement this please.
    */
-  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ???
+  override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ???
 
   /**
    * (Optional) Sampling operation. Consistent with Spark semantics of the same.
    * TODO: implement this please.
    */
-  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ???
+  override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ???
 
   /**
    * TODO: implement this please.
    */
-  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean)
+  override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean)
   : (DrmLike[Int], Option[DrmLike[K]]) = ???
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala
index a15ca09..5a17144 100644
--- a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala
@@ -110,7 +110,7 @@ trait NaiveBayes extends java.io.Serializable{
    *   aggregatedByLabelObservationDrm is a DrmLike[Int] of aggregated
    *   TF or TF-IDF counts per label
    */
-  def extractLabelsAndAggregateObservations[K: ClassTag](stringKeyedObservations: DrmLike[K],
+  def extractLabelsAndAggregateObservations[K](stringKeyedObservations: DrmLike[K],
                                                          cParser: CategoryParser = seq2SparseCategoryParser)
                                                         (implicit ctx: DistributedContext):
                                                         (mutable.HashMap[String, Integer], DrmLike[Int])= {
@@ -120,13 +120,16 @@ trait NaiveBayes extends java.io.Serializable{
     val numDocs=stringKeyedObservations.nrow
     val numFeatures=stringKeyedObservations.ncol
 
+    // For mapblocks that return K.
+    implicit val ktag = stringKeyedObservations.keyClassTag
+
     // Extract categories from labels assigned by seq2sparse
     // Categories are Stored in Drm Keys as eg.: /Category/document_id
 
     // Get a new DRM with a single column so that we don't have to collect the
     // DRM into memory upfront.
-    val strippedObeservations= stringKeyedObservations.mapBlock(ncol=1){
-      case(keys, block) =>
+    val strippedObeservations = stringKeyedObservations.mapBlock(ncol = 1) {
+      case (keys, block) =>
         val blockB = block.like(keys.size, 1)
         keys -> blockB
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
index 4e2f45c..92d0e12 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
@@ -42,7 +42,7 @@ private[math] object ALS {
    * @param drmV V matrix
    * @param iterationsRMSE RMSE values afeter each of iteration performed
    */
-  class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
+  class Result[K](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
     def toTuple = (drmU, drmV, iterationsRMSE)
   }
 
@@ -74,7 +74,7 @@ private[math] object ALS {
    * @tparam K row key type of the input (100 is probably more than enough)
    * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
    */
-  def dals[K: ClassTag](
+  def dals[K](
       drmA: DrmLike[K],
       k: Int = 50,
       lambda: Double = 0.0,
@@ -85,6 +85,9 @@ private[math] object ALS {
     assert(convergenceThreshold < 1.0, "convergenceThreshold")
     assert(maxIterations >= 1, "maxIterations")
 
+    // Some mapblock() usage may require to know ClassTag[K] bound
+    implicit val ktag = drmA.keyClassTag
+
     val drmAt = drmA.t
 
     // Initialize U and V so that they are identically distributed to A or A'

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
index 866ee34..9173d09 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
@@ -39,7 +39,10 @@ object DQR {
    * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
    * their RDD should be able to zip successfully.
    */
-  def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+  def dqrThin[K](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    // Some mapBlock() calls need it
+    implicit val ktag =  drmA.keyClassTag
 
     if (drmA.ncol > 5000)
       warn("A is too fat. A'A must fit in memory and easily broadcasted.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
index c98ee2e..4a769b9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
@@ -38,9 +38,12 @@ object DSPCA {
    * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
    *         e.g. save them to hdfs in order to trigger their computation.
    */
-  def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  def dspca[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
   (DrmLike[K], DrmLike[Int], Vector) = {
 
+    // Some mapBlock() calls need it
+    implicit val ktag =  drmA.keyClassTag
+
     val drmAcp = drmA.checkpoint()
     implicit val ctx = drmAcp.context
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
index cecaec8..acd1dc1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
@@ -23,9 +23,12 @@ object DSSVD {
    * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
    *         e.g. save them to hdfs in order to trigger their computation.
    */
-  def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  def dssvd[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
   (DrmLike[K], DrmLike[Int], Vector) = {
 
+    // Some mapBlock() calls need it
+    implicit val ktag =  drmA.keyClassTag
+
     val drmAcp = drmA.checkpoint()
 
     val m = drmAcp.nrow
@@ -43,9 +46,9 @@ object DSSVD {
     // instantiate the Omega random matrix view in the backend instead. That way serialized closure
     // is much more compact.
     var drmY = drmAcp.mapBlock(ncol = r) {
-      case (keys, blockA) =>
+      case (keys, blockA) ⇒
         val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        keys -> blockY
+        keys → blockY
     }.checkpoint()
 
     var drmQ = dqrThin(drmY)._1
@@ -62,7 +65,7 @@ object DSSVD {
 
     trace(s"dssvd:drmB'=${drmBt.collect}.")
 
-    for (i <- 0  until q) {
+    for (i ← 0  until q) {
       drmY = drmAcp %*% drmBt
       drmQ = dqrThin(drmY.checkpoint())._1
       // Checkpoint Q if last iteration

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 7f97481..78b7ce8 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -34,13 +34,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
   /** If this checkpoint is already declared cached, uncache. */
   def uncache(): this.type
 
-  /**
-   * Explicit extraction of key class Tag since traits don't support context bound access; but actual
-   * implementation knows it
-   */
-  def keyClassTag: ClassTag[K]
-
-
   /** changes the number of rows without touching the underlying data */
   def newRowCardinality(n: Int): CheckpointedDrm[K]
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index c43c6c7..da8ce9f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -27,7 +27,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
  * the DRMBase once they stabilize.
  *
  */
-class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+class CheckpointedOps[K](val drm: CheckpointedDrm[K]) {
 
 
   /** Column sums. At this point this runs on checkpoint and collects in-core vector. */

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 9ea90a1..ed93d89 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -47,18 +47,18 @@ trait DistributedEngine {
   def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
 
   /** Engine-specific colSums implementation based on a checkpoint. */
-  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+  def colSums[K](drm: CheckpointedDrm[K]): Vector
 
   /** Optional engine-specific all reduce tensor operation. */
-  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix
+  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
-  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+  def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+  def colMeans[K](drm: CheckpointedDrm[K]): Vector
 
-  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
+  def norm[K](drm: CheckpointedDrm[K]): Double
 
   /** Broadcast support */
   def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
@@ -94,7 +94,7 @@ trait DistributedEngine {
    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
    * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
    */
-  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]])
+  def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]])
 
   /**
    * (Optional) Sampling operation. Consistent with Spark semantics of the same.
@@ -104,9 +104,9 @@ trait DistributedEngine {
    * @tparam K
    * @return
    */
-  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K]
+  def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K]
 
-  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix
+  def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix
 
   /**
    * Load IndexedDataset from text delimited format.
@@ -137,7 +137,7 @@ object DistributedEngine {
   private val log = Logger.getLogger(DistributedEngine.getClass)
 
   /** This is mostly multiplication operations rewrites */
-  private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+  private def pass1[K](action: DrmLike[K]): DrmLike[K] = {
 
     action match {
 
@@ -154,16 +154,22 @@ object DistributedEngine {
             null
         }
       }
-      case OpAB(OpAt(a), b) if (a == b) ⇒ OpAtA(pass1(a))
-      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) ⇒ OpAtA(pass1(a))
+      case OpAB(OpAt(a), b) if a == b ⇒ OpAtA(pass1(a))
+      case OpABAnyKey(OpAtAnyKey(a), b) if a == b ⇒ OpAtA(pass1(a))
+
+      // A small rule change: Now that we have removed ClassTag at the %*% operation, it doesn't
+      // match b[Int] case automatically any longer. So, we need to check and rewrite it dynamically
+      // and re-run pass1 again on the obtained tree.
+      case OpABAnyKey(a, b) if b.keyClassTag == ClassTag.Int ⇒ pass1(OpAB(a, b.asInstanceOf[DrmLike[Int]]))
+      case OpAtAnyKey(a) if a.keyClassTag == ClassTag.Int ⇒ pass1(OpAt(a.asInstanceOf[DrmLike[Int]]))
 
       // For now, rewrite left-multiply via transpositions, i.e.
       // inCoreA %*% B = (B' %*% inCoreA')'
       case op@OpTimesLeftMatrix(a, b) ⇒
-      OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
 
       // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments
-      case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) ⇒
+      case op@OpRbind(a, b) if (op.keyClassTag == ClassTag.Int) ⇒
 
         // Make sure closure sees only local vals, not attributes. We need to do these ugly casts
         // around because compiler could not infer that K is the same as Int, based on if() above.
@@ -179,18 +185,18 @@ object DistributedEngine {
 
       // For everything else we just pass-thru the operator arguments to optimizer
       case uop: AbstractUnaryOp[_, K] ⇒
-        uop.A = pass1(uop.A)(uop.classTagA)
+        uop.A = pass1(uop.A)
         uop
 
       case bop: AbstractBinaryOp[_, _, K] ⇒
-        bop.A = pass1(bop.A)(bop.classTagA)
-        bop.B = pass1(bop.B)(bop.classTagB)
+        bop.A = pass1(bop.A)
+        bop.B = pass1(bop.B)
         bop
     }
   }
 
   /** This would remove stuff like A.t.t that previous step may have created */
-  private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+  private def pass2[K](action: DrmLike[K]): DrmLike[K] = {
     action match {
 
       // Fusion of unary funcs into single, like 1 + x * x.
@@ -206,24 +212,24 @@ object DistributedEngine {
         pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2))
 
       // A.t.t => A
-      case OpAt(top@OpAt(a)) ⇒  pass2(a)(top.classTagA)
+      case OpAt(top@OpAt(a)) ⇒  pass2(a)
 
       // Stop at checkpoints
       case cd: CheckpointedDrm[_] ⇒  action
 
       // For everything else we just pass-thru the operator arguments to optimizer
       case uop: AbstractUnaryOp[_, K] ⇒
-        uop.A = pass2(uop.A)(uop.classTagA)
+        uop.A = pass2(uop.A)
         uop
       case bop: AbstractBinaryOp[_, _, K] ⇒
-        bop.A = pass2(bop.A)(bop.classTagA)
-        bop.B = pass2(bop.B)(bop.classTagB)
+        bop.A = pass2(bop.A)
+        bop.B = pass2(bop.B)
         bop
     }
   }
 
   /** Some further rewrites that are conditioned on A.t.t removal */
-  private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+  private def pass3[K](action: DrmLike[K]): DrmLike[K] = {
     action match {
 
       // matrix products.
@@ -240,18 +246,18 @@ object DistributedEngine {
       case OpAB(a, b) ⇒  OpABt(pass3(a), OpAt(pass3(b)))
 
       // Rewrite A'x
-      case op@OpAx(op1@OpAt(a), x) ⇒  OpAtx(pass3(a)(op1.classTagA), x)
+      case op@OpAx(op1@OpAt(a), x) ⇒  OpAtx(pass3(a), x)
 
       // Stop at checkpoints
       case cd: CheckpointedDrm[_] ⇒  action
 
       // For everything else we just pass-thru the operator arguments to optimizer
       case uop: AbstractUnaryOp[_, K] ⇒
-        uop.A = pass3(uop.A)(uop.classTagA)
+        uop.A = pass3(uop.A)
         uop
       case bop: AbstractBinaryOp[_, _, K] ⇒
-        bop.A = pass3(bop.A)(bop.classTagA)
-        bop.B = pass3(bop.B)(bop.classTagB)
+        bop.A = pass3(bop.A)
+        bop.B = pass3(bop.B)
         bop
     }
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 8c615bf..23f5fc6 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.math.drm
 
+import scala.reflect.ClassTag
+
 /**
  *
  * Basic DRM trait.
@@ -44,6 +46,12 @@ trait DrmLike[K] {
   def ncol: Int
 
   /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  def keyClassTag: ClassTag[K]
+
+  /**
    * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.
    */

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 19432d0..e2c6e17 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -22,7 +22,7 @@ import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange}
 
 /** Common Drm ops */
-class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {
+class DrmLikeOps[K](protected[drm] val drm: DrmLike[K]) {
 
   /**
    * Parallelism adjustments. <P/>
@@ -90,9 +90,11 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {
     import RLikeDrmOps._
     import RLikeOps._
 
+    implicit val ktag = drm.keyClassTag
+
     val rowSrc: DrmLike[K] = if (rowRange != ::) {
 
-      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+      if (ClassTag.Int == ktag) {
 
         assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
         val intKeyed = drm.asInstanceOf[DrmLike[Int]]

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index aac7da1..54afc0e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,7 +25,7 @@ import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 
-class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+class RLikeDrmOps[K](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   import RLikeDrmOps._
   import org.apache.mahout.math.scalabindings._
@@ -64,11 +64,9 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, evalZeros = true)
 
-  def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+  def :%*%[B](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B,K](A = this.drm, B=that)
 
-  def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
-
-  def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+  def %*%[B](that: DrmLike[B]): DrmLike[K] = this :%*% that
 
   def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
 
@@ -98,6 +96,9 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
    * @return map of row keys into row sums, front-end collected.
    */
   def rowSumsMap(): Map[String, Double] = {
+
+    implicit val ktag = drm.keyClassTag
+
     val m = drm.mapBlock(ncol = 1) { case (keys, block) =>
       keys -> dense(block.rowSums).t
     }.collect
@@ -161,11 +162,11 @@ object RLikeDrmOps {
 
   implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
 
-  implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+  implicit def drm2RLikeOps[K](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
 
-  implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+  implicit def rlikeOps2Drm[K](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
 
-  implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+  implicit def ops2Drm[K](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
 
   // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files
   // Not sure why.
@@ -175,5 +176,5 @@ object RLikeDrmOps {
    * This is probably dangerous since it triggers implicit checkpointing with default storage level
    * setting.
    */
-  implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+  implicit def drm2cpops[K](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
index 3b6b8bf..9fba286 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -21,34 +21,25 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
 
 /**
- * Any logical binary operator (such as A + B).
- * <P/>
- *
- * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence,
- * it also inherits CheckpointAction.
- * <P/>
- * 
- * @param evidence$1 LHS key type tag
- * @param evidence$2 RHS key type tag
- * @param evidence$3 expression key type tag
- * @tparam A LHS key type
- * @tparam B RHS key type
- * @tparam K result key type
- */
-abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
-    extends CheckpointAction[K] with DrmLike[K] {
+  * Any logical binary operator (such as A + B).
+  * <P/>
+  *
+  * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence,
+  * it also inherits CheckpointAction.
+  * <P/>
+  *
+  * @tparam A LHS key type
+  * @tparam B RHS key type
+  * @tparam K result key type
+  */
+abstract class AbstractBinaryOp[A, B, K]
+  extends CheckpointAction[K] with DrmLike[K] {
 
   protected[drm] var A: DrmLike[A]
+
   protected[drm] var B: DrmLike[B]
+
   lazy val context: DistributedContext = A.context
 
   protected[mahout] def canHaveMissingRows: Boolean = false
-
-  // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
-  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
-  def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
-
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
index 60b2c77..28cf87d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -21,17 +21,13 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
 
 /** Abstract unary operator */
-abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
+abstract class AbstractUnaryOp[A, K]
     extends CheckpointAction[K] with DrmLike[K] {
 
   protected[mahout] var A: DrmLike[A]
 
   lazy val context: DistributedContext = A.context
 
-  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
-
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
   override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
index a7934a3..6daaf0e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -22,7 +22,7 @@ import scala.util.Random
 import org.apache.mahout.math.drm._
 
 /** Implementation of distributed expression checkpoint and optimizer. */
-abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
+abstract class CheckpointAction[K] extends DrmLike[K] {
 
   protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
 
@@ -37,6 +37,7 @@ abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
    */
   def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match {
     case None =>
+      implicit val cpTag = this.keyClassTag
       val plan = context.optimizerRewrite(this)
       val physPlan = context.toPhysical(plan, cacheHint)
       cp = Some(physPlan)

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
index 804a00e..e5316a0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
@@ -21,13 +21,19 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 
 /** Logical AB */
-case class OpAB[K: ClassTag ](
+case class OpAB[K](
     override var A: DrmLike[K],
     override var B: DrmLike[Int])
     extends AbstractBinaryOp[K, Int, K] {
 
   assert(A.ncol == B.nrow, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
index f131f3f..8437cdd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
@@ -21,13 +21,20 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 
 /** Logical AB */
-case class OpABAnyKey[B:ClassTag, K: ClassTag ](
+case class OpABAnyKey[B, K ](
     override var A: DrmLike[K],
     override var B: DrmLike[B])
     extends AbstractBinaryOp[K, B, K] {
 
   assert(A.ncol == B.nrow, "Incompatible operand geometry")
 
+
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
index f6503ed..63bd7e1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
@@ -21,13 +21,19 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm._
 
 /** Logical AB' */
-case class OpABt[K: ClassTag](
+case class OpABt[K](
     override var A: DrmLike[K],
     override var B: DrmLike[Int])
     extends AbstractBinaryOp[K,Int,K]  {
 
   assert(A.ncol == B.ncol, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override lazy val keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index da7c0d5..4bb83d0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -22,21 +22,27 @@ import org.apache.mahout.math.drm.DrmLike
 import scala.util.Random
 
 /** DRM elementwise operator */
-case class OpAewB[K: ClassTag](
+case class OpAewB[K](
     override var A: DrmLike[K],
     override var B: DrmLike[K],
     val op: String
     ) extends AbstractBinaryOp[K, K, K] {
 
 
-
   assert(A.ncol == B.ncol, "arguments must have same number of columns")
   assert(A.nrow == B.nrow, "arguments must have same number of rows")
+  assert(A.keyClassTag == B.keyClassTag, "Arguments of elementwise operators must have the same row key")
 
   override protected[mahout] lazy val partitioningTag: Long =
     if (A.partitioningTag == B.partitioningTag) A.partitioningTag
     else Random.nextLong()
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
index dbcb366..4f08686 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -26,7 +26,7 @@ import scala.util.Random
  *
  * @deprecated use [[OpAewUnaryFunc]] instead
  */
-case class OpAewScalar[K: ClassTag](
+case class OpAewScalar[K](
     override var A: DrmLike[K],
     val scalar: Double,
     val op: String
@@ -40,6 +40,12 @@ case class OpAewScalar[K: ClassTag](
   /** Stuff like `A +1` is always supposed to fix this */
   override protected[mahout] lazy val canHaveMissingRows: Boolean = false
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
index 71489ab..6f93980 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
@@ -24,7 +24,7 @@ import scala.util.Random
 /**
  * @author dmitriy
  */
-case class OpAewUnaryFunc[K: ClassTag](
+case class OpAewUnaryFunc[K](
     override var A: DrmLike[K],
     val f: (Double) => Double,
     val evalZeros:Boolean = false
@@ -38,6 +38,12 @@ case class OpAewUnaryFunc[K: ClassTag](
   /** Stuff like `A +1` is always supposed to fix this */
   override protected[mahout] lazy val canHaveMissingRows: Boolean = false
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override lazy val keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
index ed95f4f..5b0133f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
@@ -25,7 +25,7 @@ import collection._
 /**
  * Composition of unary elementwise functions.
  */
-case class OpAewUnaryFuncFusion[K: ClassTag](
+case class OpAewUnaryFuncFusion[K](
     override var A: DrmLike[K],
     var ff:List[OpAewUnaryFunc[K]] = Nil
     ) extends AbstractUnaryOp[K,K] with TEwFunc {
@@ -38,6 +38,12 @@ case class OpAewUnaryFuncFusion[K: ClassTag](
   /** Stuff like `A +1` is always supposed to fix this */
   override protected[mahout] lazy val canHaveMissingRows: Boolean = false
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
index 4791301..59c71bd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -19,11 +19,19 @@ package org.apache.mahout.math.drm.logical
 
 import org.apache.mahout.math.drm._
 
+import scala.reflect.ClassTag
+
 /** Logical A' */
 case class OpAt(
     override var A: DrmLike[Int])
     extends AbstractUnaryOp[Int, Int] {
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.ncol
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
index ad2a5d8..4c01f46 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -21,10 +21,16 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 
 /** A'A */
-case class OpAtA[K: ClassTag](
+case class OpAtA[K](
     override var A: DrmLike[K]
     ) extends AbstractUnaryOp[K, Int] {
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.ncol
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
index 4e1dd5c..b23dca7 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
@@ -21,10 +21,16 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm._
 
 /** Logical A' for any row key to support A'A optimizations */
-case class OpAtAnyKey[A: ClassTag](
+case class OpAtAnyKey[A](
     override var A: DrmLike[A])
     extends AbstractUnaryOp[A, Int] {
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.ncol
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
index ef3ae6b..7ec8585 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
@@ -21,13 +21,19 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 
 /** Logical A'B */
-case class OpAtB[A: ClassTag](
+case class OpAtB[A](
     override var A: DrmLike[A],
     override var B: DrmLike[A])
     extends AbstractBinaryOp[A, A, Int] {
 
   assert(A.nrow == B.nrow, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.ncol
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
index 36769c7..97b6de1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
@@ -22,6 +22,8 @@ import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
 
+import scala.reflect.ClassTag
+
 /** Logical A'x. */
 case class OpAtx(
     override var A: DrmLike[Int],
@@ -32,6 +34,12 @@ case class OpAtx(
 
   assert(A.nrow == x.length, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override val keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = safeToNonNegInt(A.ncol)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
index a726989..d25e0d9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
@@ -24,7 +24,7 @@ import RLikeOps._
 import org.apache.mahout.math.drm.DrmLike
 
 /** Logical Ax. */
-case class OpAx[K: ClassTag](
+case class OpAx[K](
     override var A: DrmLike[K],
     val x: Vector
     ) extends AbstractUnaryOp[K, K] {
@@ -33,6 +33,12 @@ case class OpAx[K: ClassTag](
 
   assert(A.ncol == x.length, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag: ClassTag[K] = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
index 0598551..932f133 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
@@ -22,12 +22,19 @@ import org.apache.mahout.math.drm.DrmLike
 import scala.util.Random
 
 /** cbind() logical operator */
-case class OpCbind[K: ClassTag](
+case class OpCbind[K](
     override var A: DrmLike[K],
     override var B: DrmLike[K]
     ) extends AbstractBinaryOp[K, K, K] {
 
   assert(A.nrow == B.nrow, "arguments must have same number of rows")
+  require(A.keyClassTag == B.keyClassTag, "arguments must have same row key")
+
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = A.keyClassTag
 
   override protected[mahout] lazy val partitioningTag: Long =
     if (A.partitioningTag == B.partitioningTag) A.partitioningTag

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
index 5aee518..99c2bfa 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
@@ -19,7 +19,7 @@ package org.apache.mahout.math.drm.logical
 import reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 
-case class OpCbindScalar[K:ClassTag](
+case class OpCbindScalar[K](
   override var A:DrmLike[K],
   var x:Double,
   val leftBind:Boolean ) extends AbstractUnaryOp[K,K] {
@@ -28,6 +28,12 @@ case class OpCbindScalar[K:ClassTag](
 
   override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index a1cd718..95e690b 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -17,13 +17,12 @@
 
 package org.apache.mahout.math.drm.logical
 
-import scala.reflect.ClassTag
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
 import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+
+import scala.reflect.{ClassTag, classTag}
 import scala.util.Random
 
-case class OpMapBlock[S: ClassTag, R: ClassTag](
+case class OpMapBlock[S, R: ClassTag](
     override var A: DrmLike[S],
     val bmf: BlockMapFunc[S, R],
     val _ncol: Int = -1,
@@ -34,6 +33,12 @@ case class OpMapBlock[S: ClassTag, R: ClassTag](
   override protected[mahout] lazy val partitioningTag: Long =
     if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = classTag[R]
+
   /** R-like syntax for number of rows. */
   def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
index f438728..0fadce3 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
@@ -4,12 +4,18 @@ import org.apache.mahout.math.drm.DrmLike
 import scala.reflect.ClassTag
 
 /** Parallelism operator */
-case class OpPar[K: ClassTag](
+case class OpPar[K](
     override var A: DrmLike[K],
     val minSplits: Int = -1,
     val exactSplits: Int = -1)
     extends AbstractUnaryOp[K, K] {
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
index d45714b..f8c1059 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
@@ -22,15 +22,22 @@ import org.apache.mahout.math.drm.DrmLike
 import scala.util.Random
 
 /** rbind() logical operator */
-case class OpRbind[K: ClassTag](
+case class OpRbind[K](
     override var A: DrmLike[K],
     override var B: DrmLike[K]
     ) extends AbstractBinaryOp[K, K, K] {
 
   assert(A.ncol == B.ncol, "arguments must have same number of columns")
+  require(A.keyClassTag == B.keyClassTag, "arguments of rbind() must have the same row key type")
 
   override protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override def keyClassTag = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow + B.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
index 697bbd3..c7d3bfa 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
@@ -19,6 +19,8 @@ package org.apache.mahout.math.drm.logical
 
 import org.apache.mahout.math.drm.DrmLike
 
+import scala.reflect.ClassTag
+
 /** Logical row-range slicing */
 case class OpRowRange(
     override var A: DrmLike[Int],
@@ -27,6 +29,12 @@ case class OpRowRange(
 
   assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override val keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = rowRange.length
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
index 1ca79b3..e8ac475 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -22,6 +22,8 @@ import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.DrmLike
 
+import scala.reflect.ClassTag
+
 /** Logical Times-left over in-core matrix operand */
 case class OpTimesLeftMatrix(
     val left: Matrix,
@@ -30,6 +32,12 @@ case class OpTimesLeftMatrix(
 
   assert(left.ncol == A.nrow, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override val keyClassTag = ClassTag.Int
+
   /** R-like syntax for number of rows. */
   def nrow: Long = left.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
index c55f7f0..1b12035 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
@@ -24,7 +24,7 @@ import RLikeOps._
 import org.apache.mahout.math.drm.DrmLike
 
 /** Logical times-right over in-core matrix operand. */
-case class OpTimesRightMatrix[K: ClassTag](
+case class OpTimesRightMatrix[K](
     override var A: DrmLike[K],
     val right: Matrix
     ) extends AbstractUnaryOp[K, K] {
@@ -33,6 +33,12 @@ case class OpTimesRightMatrix[K: ClassTag](
 
   assert(A.ncol == right.nrow, "Incompatible operand geometry")
 
+  /**
+    * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+    * implementation knows it
+    */
+  override lazy val keyClassTag = A.keyClassTag
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 6d62ff1..0b7bb8c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -85,20 +85,20 @@ package object drm {
   /** Just throw all engine operations into context as well. */
   implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
 
-  implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
+  implicit def drm2drmCpOps[K](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
     new CheckpointedOps[K](drm)
 
   /**
    * We assume that whenever computational action is invoked without explicit checkpoint, the user
    * doesn't imply caching
    */
-  implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
+  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
 
   /** Implicit conversion to in-core with NONE caching of the result. */
-  implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
+  implicit def drm2InCore[K](drm: DrmLike[K]): Matrix = drm.collect
 
   /** Do vertical concatenation of collection of blockified tuples */
-  private[mahout] def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
+  private[mahout] def rbind[K:ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
     assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
     if (blocks.size == 1) {
       // No coalescing required.
@@ -132,7 +132,7 @@ package object drm {
    *         key type is actually Int, then we just return the argument with None for the map,
    *         regardless of computeMap parameter.
    */
-  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) =
+  def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) =
     drmX.context.engine.drm2IntKeyed(drmX, computeMap)
 
   /**
@@ -143,23 +143,23 @@ package object drm {
    * @tparam K
    * @return samples
    */
-  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] =
+  def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] =
     drmX.context.engine.drmSampleRows(drmX, fraction, replacement)
 
-  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix =
+  def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix =
     drmX.context.engine.drmSampleKRows(drmX, numSamples, replacement)
 
   ///////////////////////////////////////////////////////////
   // Elementwise unary functions on distributed operands.
-  def dexp[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true)
+  def dexp[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true)
 
-  def dlog[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true)
+  def dlog[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true)
 
-  def dabs[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs)
+  def dabs[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs)
 
-  def dsqrt[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt)
+  def dsqrt[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt)
 
-  def dsignum[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum)
+  def dsignum[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum)
   
   ///////////////////////////////////////////////////////////
   // Misc. math utilities.
@@ -171,7 +171,7 @@ package object drm {
    * @tparam K
    * @return colMeans → colVariances
    */
-  def dcolMeanVars[K: ClassTag](drmA: DrmLike[K]): (Vector, Vector) = {
+  def dcolMeanVars[K](drmA: DrmLike[K]): (Vector, Vector) = {
 
     import RLikeDrmOps._
 
@@ -190,7 +190,7 @@ package object drm {
    * @param drmA note: input will be pinned to cache if not yet pinned
    * @return colMeans → colStdevs
    */
-  def dcolMeanStdevs[K: ClassTag](drmA: DrmLike[K]): (Vector, Vector) = {
+  def dcolMeanStdevs[K](drmA: DrmLike[K]): (Vector, Vector) = {
     val (mu, vars) = dcolMeanVars(drmA)
     mu → (vars ::= math.sqrt _)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
index fdfb3f9..525da11 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala
@@ -24,16 +24,18 @@ import scalabindings._
 import RLikeOps._
 import RLikeDrmOps._
 
+import scala.reflect.{ClassTag,classTag}
+
 /** Common tests for DrmLike operators to be executed by all distributed engines. */
 trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers {
-  this: FunSuite =>
+  this: FunSuite ⇒
 
   test("mapBlock") {
 
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = A.mapBlock(/* Inherit width */) {
-      case (keys, block) => keys -> (block += 1.0)
+      case (keys, block) ⇒ keys → (block += 1.0)
     }
 
     val inCoreB = B.collect
@@ -43,9 +45,23 @@ trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers {
 
     // Assert they are the same
     (inCoreB - inCoreBControl).norm should be < 1E-10
+    B.keyClassTag shouldBe ClassTag.Int
 
   }
 
+  test ("mapBlock implicit keying") {
+
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = A.mapBlock(/* Inherit width */) {
+      case (keys, block) ⇒ keys.map { k ⇒ k.toString } → block
+    }
+
+    B.keyClassTag shouldBe classTag[String]
+
+  }
+
+
   test("allReduceBlock") {
 
     val mxA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala
index ca06cc0..f76a3f9 100644
--- a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala
+++ b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala
@@ -58,7 +58,7 @@ object SparkNaiveBayes extends NaiveBayes{
    *   aggregatedByLabelObservationDrm is a DrmLike[Int] of aggregated
    *   TF or TF-IDF counts per label
    */
-  override def extractLabelsAndAggregateObservations[K: ClassTag](stringKeyedObservations: DrmLike[K],
+  override def extractLabelsAndAggregateObservations[K](stringKeyedObservations: DrmLike[K],
                                                                   cParser: CategoryParser = seq2SparseCategoryParser)
                                                                  (implicit ctx: DistributedContext):
                                                                  (mutable.HashMap[String, Integer], DrmLike[Int]) = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 0eed8d4..e9f2f95 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver {
   }
 
   /** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-  private def readTrainingSet: DrmLike[_]= {
+  private def readTrainingSet(): DrmLike[_]= {
     val inputPath = parser.opts("input").asInstanceOf[String]
     val trainingSet= drm.drmDfsRead(inputPath)
     trainingSet
@@ -99,7 +99,7 @@ object TrainNBDriver extends MahoutSparkDriver {
        Hadoop1HDFSUtil.delete(fullPathToModel)
     }
 
-    val trainingSet = readTrainingSet
+    val trainingSet = readTrainingSet()
     // Use Spark-Optimized Naive Bayes here to extract labels and aggregate options
     val (labelIndex, aggregatedObservations) = SparkNaiveBayes.extractLabelsAndAggregateObservations(trainingSet)
     val model = SparkNaiveBayes.train(aggregatedObservations, labelIndex, complementary, alpha.toFloat)