You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/02/18 04:23:36 UTC

git commit: SPARK-1098: Minor cleanup of ClassTag usage in Java API

Repository: incubator-spark
Updated Branches:
  refs/heads/master e0d49ad22 -> f74ae0ebc


SPARK-1098: Minor cleanup of ClassTag usage in Java API

Our usage of fake ClassTags in this manner is probably not healthy, but I'm not sure if there's a better solution available, so I just cleaned up and documented the current one.

Author: Aaron Davidson <aa...@databricks.com>

Closes #604 from aarondav/master and squashes the following commits:

b398e89 [Aaron Davidson] SPARK-1098: Minor cleanup of ClassTag usage in Java API


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f74ae0eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f74ae0eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f74ae0eb

Branch: refs/heads/master
Commit: f74ae0ebcee59b70a56d34bdf63e3d1b38e2bd59
Parents: e0d49ad
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon Feb 17 19:23:27 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Feb 17 19:23:27 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala | 82 +++++++++----------
 .../org/apache/spark/api/java/JavaRDD.scala     |  4 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala | 37 +++++----
 .../spark/api/java/JavaSparkContext.scala       | 85 +++++++++++---------
 4 files changed, 108 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f74ae0eb/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index cd0aea0..3f67290 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -17,35 +17,29 @@
 
 package org.apache.spark.api.java
 
-import java.util.{List => JList}
-import java.util.Comparator
+import java.util.{Comparator, List => JList}
 
-import scala.Tuple2
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
 import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.HashPartitioner
-import org.apache.spark.Partitioner
+import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.Partitioner._
 import org.apache.spark.SparkContext.rddToPairRDDFunctions
-import org.apache.spark.api.java.function.{Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.OrderedRDDFunctions
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
 import org.apache.spark.storage.StorageLevel
 
-
-class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K],
-  implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
+                       (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
+  extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
 
   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 
@@ -158,7 +152,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
     mergeValue: JFunction2[C, V, C],
     mergeCombiners: JFunction2[C, C, C],
     partitioner: Partitioner): JavaPairRDD[K, C] = {
-    implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val ctag: ClassTag[C] = fakeClassTag
     fromRDD(rdd.combineByKey(
       createCombiner,
       mergeValue,
@@ -284,19 +278,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * RDD will be <= us.
    */
   def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = {
-    implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val ctag: ClassTag[W] = fakeClassTag
     fromRDD(rdd.subtractByKey(other))
   }
 
   /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
   def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = {
-    implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val ctag: ClassTag[W] = fakeClassTag
     fromRDD(rdd.subtractByKey(other, numPartitions))
   }
 
   /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
   def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = {
-    implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val ctag: ClassTag[W] = fakeClassTag
     fromRDD(rdd.subtractByKey(other, p))
   }
 
@@ -345,7 +339,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
   def combineByKey[C](createCombiner: JFunction[V, C],
     mergeValue: JFunction2[C, V, C],
     mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
-    implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val ctag: ClassTag[C] = fakeClassTag
     fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
   }
 
@@ -438,7 +432,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * this also retains the original RDD's partitioning.
    */
   def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
-    implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.mapValues(f))
   }
 
@@ -449,7 +443,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
     import scala.collection.JavaConverters._
     def fn = (x: V) => f.apply(x).asScala
-    implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))
   }
 
@@ -682,31 +676,35 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
 }
 
 object JavaPairRDD {
-  def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K],
-    vcm: ClassTag[T]): RDD[(K, JList[T])] =
-    rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
-
-  def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K],
-    vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd)
-    .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
-
-  def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
-    Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1],
-    JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
-    (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
-      seqAsJavaList(x._2),
-      seqAsJavaList(x._3)))
-
-  def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
+  private[spark]
+  def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
+    rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
+  }
+
+  private[spark]
+  def cogroupResultToJava[K: ClassTag, V, W](
+      rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
+    rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+  }
+
+  private[spark]
+  def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
+      rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
+    rddToPairRDDFunctions(rdd)
+      .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
+  }
+
+  def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
     new JavaPairRDD[K, V](rdd)
+  }
 
   implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
 
 
   /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
   def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
-    implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    implicit val ctagK: ClassTag[K] = fakeClassTag
+    implicit val ctagV: ClassTag[V] = fakeClassTag
     new JavaPairRDD[K, V](rdd.rdd)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f74ae0eb/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 7d48ce0..7b73057 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -24,8 +24,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 
-class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
-JavaRDDLike[T, JavaRDD[T]] {
+class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
+  extends JavaRDDLike[T, JavaRDD[T]] {
 
   override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f74ae0eb/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index fcb9729..24a9925 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.api.java
 
-import java.util.{List => JList, Comparator}
+import java.util.{Comparator, List => JList}
+
 import scala.Tuple2
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -25,14 +26,14 @@ import scala.reflect.ClassTag
 import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
-import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.api.java.JavaPairRDD._
-import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
-import org.apache.spark.partial.{PartialResult, BoundedDouble}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
-
 trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def wrapRDD(rdd: RDD[T]): This
 
@@ -88,8 +89,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
+    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
+    new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
   }
 
   /**
@@ -119,8 +120,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
+    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
+    JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
   }
 
   /**
@@ -202,10 +203,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * mapping to that key.
    */
   def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
-    implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val vcm: ClassTag[JList[T]] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
+    implicit val ctagK: ClassTag[K] = fakeClassTag
+    implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
   }
 
   /**
@@ -213,10 +213,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * mapping to that key.
    */
   def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
-    implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val vcm: ClassTag[JList[T]] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
+    implicit val ctagK: ClassTag[K] = fakeClassTag
+    implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
   }
 
   /**
@@ -407,7 +406,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Creates tuples of the elements in this RDD by applying `f`.
    */
   def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
-    implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+    implicit val ctag: ClassTag[K] = fakeClassTag
     JavaPairRDD.fromRDD(rdd.keyBy(f))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f74ae0eb/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 22dc9c9..dc26b7f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -23,19 +23,17 @@ import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
+import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import com.google.common.base.Optional
 
 import org.apache.spark._
-import org.apache.spark.SparkContext.IntAccumulatorParam
-import org.apache.spark.SparkContext.DoubleAccumulatorParam
+import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 
-
 /**
  * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
  * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
@@ -96,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /** Distribute a local Scala collection to form an RDD. */
   def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
-    implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    implicit val ctag: ClassTag[T] = fakeClassTag
     sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
   }
 
@@ -107,8 +105,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   /** Distribute a local Scala collection to form an RDD. */
   def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
   : JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    implicit val ctagK: ClassTag[K] = fakeClassTag
+    implicit val ctagV: ClassTag[V] = fakeClassTag
     JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
   }
 
@@ -149,8 +147,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     valueClass: Class[V],
     minSplits: Int
     ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
   }
 
@@ -163,8 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     */
   def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
   JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
   }
 
@@ -176,8 +174,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * that there's very little effort required to save arbitrary objects.
    */
   def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
-    implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    sc.objectFile(path, minSplits)(cm)
+    implicit val ctag: ClassTag[T] = fakeClassTag
+    sc.objectFile(path, minSplits)(ctag)
   }
 
   /**
@@ -188,8 +186,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * that there's very little effort required to save arbitrary objects.
    */
   def objectFile[T](path: String): JavaRDD[T] = {
-    implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    sc.objectFile(path)(cm)
+    implicit val ctag: ClassTag[T] = fakeClassTag
+    sc.objectFile(path)(ctag)
   }
 
   /**
@@ -209,8 +207,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     valueClass: Class[V],
     minSplits: Int
     ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
@@ -229,8 +227,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     keyClass: Class[K],
     valueClass: Class[V]
     ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
   }
 
@@ -248,8 +246,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     valueClass: Class[V],
     minSplits: Int
     ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
@@ -266,8 +264,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     keyClass: Class[K],
     valueClass: Class[V]
     ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
     new JavaPairRDD(sc.hadoopFile(path,
       inputFormatClass, keyClass, valueClass))
   }
@@ -287,8 +285,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     kClass: Class[K],
     vClass: Class[V],
     conf: Configuration): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(kClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(vClass)
     new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
   }
 
@@ -306,26 +304,26 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     fClass: Class[F],
     kClass: Class[K],
     vClass: Class[V]): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
+    implicit val ctagK: ClassTag[K] = ClassTag(kClass)
+    implicit val ctagV: ClassTag[V] = ClassTag(vClass)
     new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
   }
 
   /** Build the union of two or more RDDs. */
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
     val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
-    implicit val cm: ClassTag[T] = first.classTag
-    sc.union(rdds)(cm)
+    implicit val ctag: ClassTag[T] = first.classTag
+    sc.union(rdds)
   }
 
   /** Build the union of two or more RDDs. */
   override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
       : JavaPairRDD[K, V] = {
     val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
-    implicit val cm: ClassTag[(K, V)] = first.classTag
-    implicit val kcm: ClassTag[K] = first.kClassTag
-    implicit val vcm: ClassTag[V] = first.vClassTag
-    new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
+    implicit val ctag: ClassTag[(K, V)] = first.classTag
+    implicit val ctagK: ClassTag[K] = first.kClassTag
+    implicit val ctagV: ClassTag[V] = first.vClassTag
+    new JavaPairRDD(sc.union(rdds))
   }
 
   /** Build the union of two or more RDDs. */
@@ -447,8 +445,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
 
   protected def checkpointFile[T](path: String): JavaRDD[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    implicit val ctag: ClassTag[T] = fakeClassTag
     new JavaRDD(sc.checkpointFile(path))
   }
 
@@ -535,4 +532,18 @@ object JavaSparkContext {
    * your driver program.
    */
   def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as the Java compiler
+   * cannot produce them automatically. While this ClassTag-faking does please the compiler,
+   * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance
+   * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
 }