You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/11 21:08:02 UTC

[1/3] git commit: we clone hadoop key and values by default and reuse if specified.

Updated Branches:
  refs/heads/master 4216178d5 -> ee6e7f9b8


we clone hadoop key and values by default and reuse if specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/277b4a36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/277b4a36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/277b4a36

Branch: refs/heads/master
Commit: 277b4a36c580e88d9ba60c5efe63753350197874
Parents: c0f0155
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Wed Jan 8 16:32:55 2014 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Wed Jan 8 16:32:55 2014 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 68 +++++++++++---------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 22 +++++--
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 15 ++++-
 .../scala/org/apache/spark/util/Utils.scala     | 23 ++++++-
 4 files changed, 87 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/277b4a36/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e47f4e..97fec7f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -341,25 +341,27 @@ class SparkContext(
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
    * etc).
    */
-  def hadoopRDD[K, V](
+  def hadoopRDD[K: ClassTag, V: ClassTag](
       conf: JobConf,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits
+      minSplits: Int = defaultMinSplits,
+      cloneKeyValues: Boolean = true
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V](
+  def hadoopFile[K: ClassTag, V: ClassTag](
       path: String,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits
+      minSplits: Int = defaultMinSplits,
+      cloneKeyValues: Boolean = true
       ): RDD[(K, V)] = {
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -371,7 +373,8 @@ class SparkContext(
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits)
+      minSplits,
+      cloneKeyValues)
   }
 
   /**
@@ -382,14 +385,15 @@ class SparkContext(
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
    * }}}
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
-      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
-      : RDD[(K, V)] = {
+  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int,
+      cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
+      ): RDD[(K, V)] = {
     hadoopFile(path,
         fm.runtimeClass.asInstanceOf[Class[F]],
         km.runtimeClass.asInstanceOf[Class[K]],
         vm.runtimeClass.asInstanceOf[Class[V]],
-        minSplits)
+        minSplits,
+      cloneKeyValues = cloneKeyValues)
   }
 
   /**
@@ -400,61 +404,67 @@ class SparkContext(
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
    * }}}
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
-    hadoopFile[K, V, F](path, defaultMinSplits)
+    hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
-      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
+      cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
+      ): RDD[(K, V)] = {
     newAPIHadoopFile(
         path,
         fm.runtimeClass.asInstanceOf[Class[F]],
         km.runtimeClass.asInstanceOf[Class[K]],
-        vm.runtimeClass.asInstanceOf[Class[V]])
+        vm.runtimeClass.asInstanceOf[Class[V]],
+        cloneKeyValues = cloneKeyValues)
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+  def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
       path: String,
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+      conf: Configuration = hadoopConfiguration,
+      cloneKeyValues: Boolean = true): RDD[(K, V)] = {
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
     val updatedConf = job.getConfiguration
-    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    */
-  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+  def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
       conf: Configuration = hadoopConfiguration,
       fClass: Class[F],
       kClass: Class[K],
-      vClass: Class[V]): RDD[(K, V)] = {
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+      vClass: Class[V],
+      cloneKeyValues: Boolean = true): RDD[(K, V)] = {
+    new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
   }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String,
+  def sequenceFile[K: ClassTag, V: ClassTag](path: String,
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int
+      minSplits: Int,
+      cloneKeyValues: Boolean = true
       ): RDD[(K, V)] = {
     val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
-    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
   }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
-    sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+  def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
+      cloneKeyValues: Boolean = true): RDD[(K, V)] =
+    sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
 
   /**
    * Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -472,8 +482,8 @@ class SparkContext(
    * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
    * allow it to figure out the Writable class to use in the subclass case.
    */
-   def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
-      (implicit km: ClassTag[K], vm: ClassTag[V],
+   def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
+       cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
           kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
       : RDD[(K, V)] = {
     val kc = kcf()
@@ -481,7 +491,7 @@ class SparkContext(
     val format = classOf[SequenceFileInputFormat[Writable, Writable]]
     val writables = hadoopFile(path, format,
         kc.writableClass(km).asInstanceOf[Class[Writable]],
-        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
+        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
     writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/277b4a36/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a3..13949a1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,7 +19,9 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
-import org.apache.hadoop.mapred.FileInputFormat
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.{Configuration, Configurable}
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -31,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.spark.util.Utils.cloneWritables
 
 
 /**
@@ -62,14 +64,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
  */
-class HadoopRDD[K, V](
+class HadoopRDD[K: ClassTag, V: ClassTag](
     sc: SparkContext,
     broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    minSplits: Int)
+    minSplits: Int,
+    cloneKeyValues: Boolean)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   def this(
@@ -78,7 +81,8 @@ class HadoopRDD[K, V](
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int) = {
+      minSplits: Int,
+      cloneKeyValues: Boolean) = {
     this(
       sc,
       sc.broadcast(new SerializableWritable(conf))
@@ -87,7 +91,7 @@ class HadoopRDD[K, V](
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits)
+      minSplits, cloneKeyValues)
   }
 
   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -169,7 +173,11 @@ class HadoopRDD[K, V](
           case eof: EOFException =>
             finished = true
         }
-        (key, value)
+        if (cloneKeyValues) {
+          (cloneWritables(key, getConf), cloneWritables(value, getConf))
+        } else {
+          (key, value)
+        }
       }
 
       override def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/277b4a36/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2662d48..5428fc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.reflect.ClassTag
+
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.util.Utils.cloneWritables
 
 
 private[spark]
@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
   override def hashCode(): Int = (41 * (41 + rddId) + index)
 }
 
-class NewHadoopRDD[K, V](
+class NewHadoopRDD[K: ClassTag, V: ClassTag](
     sc : SparkContext,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    @transient conf: Configuration)
+    @transient conf: Configuration,
+    cloneKeyValues: Boolean)
   extends RDD[(K, V)](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
@@ -105,7 +109,12 @@ class NewHadoopRDD[K, V](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        (reader.getCurrentKey, reader.getCurrentValue)
+        val key = reader.getCurrentKey
+        val value = reader.getCurrentValue
+        if (cloneKeyValues) {
+          (cloneWritables(key, conf), cloneWritables(value, conf))
+        } else
+        (key, value)
       }
 
       private def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/277b4a36/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f12531..192806e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,23 +26,42 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import org.apache.hadoop.io._
 
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import org.apache.spark.deploy.SparkHadoopUtil
 import java.nio.ByteBuffer
-import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkException, Logging}
 
 
 /**
  * Various utility methods used by Spark.
  */
 private[spark] object Utils extends Logging {
+
+  /**
+   *  We try to clone for most common types of writables and we call WritableUtils.clone otherwise
+   *  intention is to optimize, for example for NullWritable there is no need and for Long, int and
+   *  String creating a new object with value set would be faster.
+   */
+  def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = {
+    val cloned = classTag[T] match {
+      case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes)
+      case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get)
+      case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get)
+      case ClassTag(_: NullWritable) => obj // TODO: should we clone this ?
+      case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning.
+    }
+    cloned.asInstanceOf[T]
+  }
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()


[3/3] git commit: Merge pull request #359 from ScrapCodes/clone-writables

Posted by rx...@apache.org.
Merge pull request #359 from ScrapCodes/clone-writables

We clone hadoop key and values by default and reuse objects if asked to.

 We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully.

There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ee6e7f9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ee6e7f9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ee6e7f9b

Branch: refs/heads/master
Commit: ee6e7f9b8cc56985787546882fba291cf9ad7667
Parents: 4216178 59b03e0
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jan 11 12:07:55 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jan 11 12:07:55 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 78 +++++++++++---------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 29 +++++---
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 20 ++++-
 .../scala/org/apache/spark/util/Utils.scala     | 28 ++++++-
 4 files changed, 106 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee6e7f9b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee6e7f9b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------


[2/3] git commit: Fixes corresponding to Reynolds feedback comments

Posted by rx...@apache.org.
Fixes corresponding to Reynolds feedback comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/59b03e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/59b03e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/59b03e01

Branch: refs/heads/master
Commit: 59b03e015d581bbab74f1fe33a3ec1fd7840c3db
Parents: 277b4a3
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Thu Jan 9 11:58:18 2014 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Thu Jan 9 12:26:30 2014 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 30 ++++++++++----------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 11 ++++---
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 11 ++++---
 .../scala/org/apache/spark/util/Utils.scala     | 23 +++++++++------
 4 files changed, 43 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/59b03e01/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 97fec7f..bceeaa0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -385,14 +385,14 @@ class SparkContext(
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
    * }}}
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int,
-      cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
-      ): RDD[(K, V)] = {
+  def hadoopFile[K, V, F <: InputFormat[K, V]]
+      (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     hadoopFile(path,
-        fm.runtimeClass.asInstanceOf[Class[F]],
-        km.runtimeClass.asInstanceOf[Class[K]],
-        vm.runtimeClass.asInstanceOf[Class[V]],
-        minSplits,
+      fm.runtimeClass.asInstanceOf[Class[F]],
+      km.runtimeClass.asInstanceOf[Class[K]],
+      vm.runtimeClass.asInstanceOf[Class[V]],
+      minSplits,
       cloneKeyValues = cloneKeyValues)
   }
 
@@ -409,15 +409,15 @@ class SparkContext(
     hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
-      cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
-      ): RDD[(K, V)] = {
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
+      (path: String, cloneKeyValues: Boolean = true)
+      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     newAPIHadoopFile(
-        path,
-        fm.runtimeClass.asInstanceOf[Class[F]],
-        km.runtimeClass.asInstanceOf[Class[K]],
-        vm.runtimeClass.asInstanceOf[Class[V]],
-        cloneKeyValues = cloneKeyValues)
+      path,
+      fm.runtimeClass.asInstanceOf[Class[F]],
+      km.runtimeClass.asInstanceOf[Class[K]],
+      vm.runtimeClass.asInstanceOf[Class[V]],
+      cloneKeyValues = cloneKeyValues)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/59b03e01/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 13949a1..2da4611 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -22,6 +22,7 @@ import java.io.EOFException
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -91,7 +92,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits, cloneKeyValues)
+      minSplits,
+      cloneKeyValues)
   }
 
   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -162,10 +164,10 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
 
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback{ () => closeIfNeeded() }
-
       val key: K = reader.createKey()
+      val keyCloneFunc = cloneWritables[K](getConf)
       val value: V = reader.createValue()
-
+      val valueCloneFunc = cloneWritables[V](getConf)
       override def getNext() = {
         try {
           finished = !reader.next(key, value)
@@ -174,7 +176,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
             finished = true
         }
         if (cloneKeyValues) {
-          (cloneWritables(key, getConf), cloneWritables(value, getConf))
+          (keyCloneFunc(key.asInstanceOf[Writable]),
+            valueCloneFunc(value.asInstanceOf[Writable]))
         } else {
           (key, value)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/59b03e01/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 5428fc5..e1f9995 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -92,7 +92,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
 
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback(() => close())
-
+      val keyCloneFunc = cloneWritables[K](conf)
+      val valueCloneFunc = cloneWritables[V](conf)
       var havePair = false
       var finished = false
 
@@ -112,9 +113,11 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
         val key = reader.getCurrentKey
         val value = reader.getCurrentValue
         if (cloneKeyValues) {
-          (cloneWritables(key, conf), cloneWritables(value, conf))
-        } else
-        (key, value)
+          (keyCloneFunc(key.asInstanceOf[Writable]),
+            valueCloneFunc(value.asInstanceOf[Writable]))
+        } else {
+          (key, value)
+        }
       }
 
       private def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/59b03e01/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 192806e..23b7270 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -51,15 +51,20 @@ private[spark] object Utils extends Logging {
    *  intention is to optimize, for example for NullWritable there is no need and for Long, int and
    *  String creating a new object with value set would be faster.
    */
-  def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = {
-    val cloned = classTag[T] match {
-      case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes)
-      case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get)
-      case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get)
-      case ClassTag(_: NullWritable) => obj // TODO: should we clone this ?
-      case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning.
-    }
-    cloned.asInstanceOf[T]
+  def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
+    val cloneFunc = classTag[T] match {
+      case ClassTag(_: Text) => 
+        (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
+      case ClassTag(_: LongWritable) => 
+        (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
+      case ClassTag(_: IntWritable) => 
+        (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
+      case ClassTag(_: NullWritable) => 
+        (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
+      case _ => 
+        (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
+    }
+    cloneFunc
   }
 
   /** Serialize an object using Java serialization */