You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/24 04:12:07 UTC

[1/4] git commit: Remove Hadoop object cloning and warn users making Hadoop RDD's.

Updated Branches:
  refs/heads/master cad3002fe -> c3196171f


Remove Hadoop object cloning and warn users making Hadoop RDD's.

The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in verious file formats,
including Avro and another custom format.

This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/71010178
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/71010178
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/71010178

Branch: refs/heads/master
Commit: 7101017803a70f3267381498594c0e8c604f932c
Parents: a1cd185
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 13:30:54 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 17:39:23 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 120 +++++++++-----
 .../spark/api/java/JavaSparkContext.scala       | 165 ++++++-------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  25 +--
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  22 +--
 .../scala/org/apache/spark/util/Utils.scala     |  23 +--
 5 files changed, 134 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71010178/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 923b4ed..7ff06b5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -341,7 +341,7 @@ class SparkContext(
    */
   def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
-      minSplits, cloneRecords = false).map(pair => pair._2.toString)
+      minSplits).map(pair => pair._2.toString)
   }
 
   /**
@@ -354,33 +354,37 @@ class SparkContext(
    * @param keyClass Class of the keys
    * @param valueClass Class of the values
    * @param minSplits Minimum number of Hadoop Splits to generate.
-   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
-   *                     Most RecordReader implementations reuse wrapper objects across multiple
-   *                     records, and can cause problems in RDD collect or aggregation operations.
-   *                     By default the records are cloned in Spark. However, application
-   *                     programmers can explicitly disable the cloning for better performance.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def hadoopRDD[K: ClassTag, V: ClassTag](
+  def hadoopRDD[K, V](
       conf: JobConf,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits,
-      cloneRecords: Boolean = true
+      minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K: ClassTag, V: ClassTag](
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
+  def hadoopFile[K, V](
       path: String,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits,
-      cloneRecords: Boolean = true
+      minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   /**
@@ -403,16 +406,20 @@ class SparkContext(
    * {{{
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
    * }}}
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]]
-      (path: String, minSplits: Int, cloneRecords: Boolean = true)
+      (path: String, minSplits: Int)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     hadoopFile(path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
       vm.runtimeClass.asInstanceOf[Class[V]],
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   /**
@@ -421,69 +428,91 @@ class SparkContext(
    * can just write, for example,
    * {{{
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
-   * }}}
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
+  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
-    hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
+    hadoopFile[K, V, F](path, defaultMinSplits)
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
-      (path: String, cloneRecords: Boolean = true)
+      (path: String)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     newAPIHadoopFile(
       path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
-      vm.runtimeClass.asInstanceOf[Class[V]],
-      cloneRecords = cloneRecords)
+      vm.runtimeClass.asInstanceOf[Class[V]])
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       path: String,
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      conf: Configuration = hadoopConfiguration,
-      cloneRecords: Boolean = true): RDD[(K, V)] = {
+      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
     val updatedConf = job.getConfiguration
-    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
+    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
       conf: Configuration = hadoopConfiguration,
       fClass: Class[F],
       kClass: Class[K],
-      vClass: Class[V],
-      cloneRecords: Boolean = true): RDD[(K, V)] = {
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
+      vClass: Class[V]): RDD[(K, V)] = {
+    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
   }
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
   def sequenceFile[K: ClassTag, V: ClassTag](path: String,
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int,
-      cloneRecords: Boolean = true
+      minSplits: Int
       ): RDD[(K, V)] = {
     val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
-    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
-      cloneRecords: Boolean = true): RDD[(K, V)] =
-    sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
+  def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V]
+      ): RDD[(K, V)] =
+    sequenceFile(path, keyClass, valueClass, defaultMinSplits)
 
   /**
    * Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +529,14 @@ class SparkContext(
    * have a parameterized singleton object). We use functions instead to create a new converter
    * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
    * allow it to figure out the Writable class to use in the subclass case.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
    def sequenceFile[K, V]
-       (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+       (path: String, minSplits: Int = defaultMinSplits)
        (implicit km: ClassTag[K], vm: ClassTag[V],
         kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
       : RDD[(K, V)] = {
@@ -511,7 +545,7 @@ class SparkContext(
     val format = classOf[SequenceFileInputFormat[Writable, Writable]]
     val writables = hadoopFile(path, format,
         kc.writableClass(km).asInstanceOf[Class[Writable]],
-        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
     writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71010178/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 50ac700..75b8e76 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    */
   def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
   def sequenceFile[K, V](path: String,
     keyClass: Class[K],
     valueClass: Class[V],
@@ -148,19 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
   }
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String,
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords))
-  }
-
-  /** Get an RDD for a Hadoop SequenceFile. */
+  /** Get an RDD for a Hadoop SequenceFile.
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
   JavaPairRDD[K, V] = {
     implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -168,15 +168,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop SequenceFile. */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], 
-    cloneRecords: Boolean):
-  JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords))
-  }
-
   /**
    * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
    * BytesWritable values that contain a serialized partition. This is still an experimental storage
@@ -205,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
    * etc).
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopRDD[K, V, F <: InputFormat[K, V]](
     conf: JobConf,
@@ -218,41 +214,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
-
-  /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
-   * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
-   * using the older MapReduce API (`org.apache.hadoop.mapred`).
-   *
-   * @param conf JobConf for setting up the dataset
-   * @param inputFormatClass Class of the [[InputFormat]]
-   * @param keyClass Class of the keys
-   * @param valueClass Class of the values
-   * @param minSplits Minimum number of Hadoop Splits to generate.
-   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
-   *                     Most RecordReader implementations reuse wrapper objects across multiple
-   *                     records, and can cause problems in RDD collect or aggregation operations.
-   *                     By default the records are cloned in Spark. However, application
-   *                     programmers can explicitly disable the cloning for better performance.
-   */
-  def hadoopRDD[K, V, F <: InputFormat[K, V]](
-    conf: JobConf,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits,
-      cloneRecords))
-  }
-
   /**
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
-   * etc).
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopRDD[K, V, F <: InputFormat[K, V]](
     conf: JobConf,
@@ -265,7 +234,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](
     path: String,
     inputFormatClass: Class[F],
@@ -278,22 +253,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, 
-      minSplits, cloneRecords))
-  }
-
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+    *
+    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](
     path: String,
     inputFormatClass: Class[F],
@@ -306,23 +272,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
       inputFormatClass, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path,
-      inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords))
-  }
-
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
     path: String,
@@ -338,22 +295,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
-    path: String,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V],
-    conf: Configuration,
-    cloneRecords: Boolean): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords))
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
+   *
+   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
     conf: Configuration,
@@ -365,21 +311,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
   }
 
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
-    conf: Configuration,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V],
-    cloneRecords: Boolean): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords))
-  }
-
   /** Build the union of two or more RDDs. */
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
     val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71010178/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index dbe76f3..72826ef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -34,7 +34,6 @@ import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.NextIterator
-import org.apache.spark.util.Utils.cloneWritables
 
 
 /**
@@ -64,21 +63,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- *                     Most RecordReader implementations reuse wrapper objects across multiple
- *                     records, and can cause problems in RDD collect or aggregation operations.
- *                     By default the records are cloned in Spark. However, application
- *                     programmers can explicitly disable the cloning for better performance.
  */
-class HadoopRDD[K: ClassTag, V: ClassTag](
+class HadoopRDD[K, V](
     sc: SparkContext,
     broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean = true)
+    minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   def this(
@@ -87,8 +80,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int,
-      cloneRecords: Boolean) = {
+      minSplits: Int) = {
     this(
       sc,
       sc.broadcast(new SerializableWritable(conf))
@@ -97,8 +89,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -170,9 +161,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback{ () => closeIfNeeded() }
       val key: K = reader.createKey()
-      val keyCloneFunc = cloneWritables[K](jobConf)
       val value: V = reader.createValue()
-      val valueCloneFunc = cloneWritables[V](jobConf)
       override def getNext() = {
         try {
           finished = !reader.next(key, value)
@@ -180,11 +169,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
           case eof: EOFException =>
             finished = true
         }
-        if (cloneRecords) {
-          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
-        } else {
-          (key, value)
-        }
+        (key, value)
       }
 
       override def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71010178/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 992bd4a..955ba31 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,15 +20,11 @@ package org.apache.spark.rdd
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import scala.reflect.ClassTag
-
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.util.Utils.cloneWritables
-
 
 private[spark]
 class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
@@ -48,19 +44,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param conf The Hadoop configuration.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- *                     Most RecordReader implementations reuse wrapper objects across multiple
- *                     records, and can cause problems in RDD collect or aggregation operations.
- *                     By default the records are cloned in Spark. However, application
- *                     programmers can explicitly disable the cloning for better performance.
  */
-class NewHadoopRDD[K: ClassTag, V: ClassTag](
+class NewHadoopRDD[K, V](
     sc : SparkContext,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    @transient conf: Configuration,
-    cloneRecords: Boolean)
+    @transient conf: Configuration)
   extends RDD[(K, V)](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
@@ -107,8 +97,6 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
 
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback(() => close())
-      val keyCloneFunc = cloneWritables[K](conf)
-      val valueCloneFunc = cloneWritables[V](conf)
       var havePair = false
       var finished = false
 
@@ -127,11 +115,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
         havePair = false
         val key = reader.getCurrentKey
         val value = reader.getCurrentValue
-        if (cloneRecords) {
-          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
-        } else {
-          (key, value)
-        }
+        (key, value)
       }
 
       private def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71010178/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index caa9bf4..64acfbd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -46,27 +46,6 @@ import org.apache.spark.{SparkConf, SparkException, Logging}
  */
 private[spark] object Utils extends Logging {
 
-  /**
-   *  We try to clone for most common types of writables and we call WritableUtils.clone otherwise
-   *  intention is to optimize, for example for NullWritable there is no need and for Long, int and
-   *  String creating a new object with value set would be faster.
-   */
-  def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
-    val cloneFunc = classTag[T] match {
-      case ClassTag(_: Text) => 
-        (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
-      case ClassTag(_: LongWritable) => 
-        (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
-      case ClassTag(_: IntWritable) => 
-        (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
-      case ClassTag(_: NullWritable) => 
-        (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
-      case _ => 
-        (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
-    }
-    cloneFunc
-  }
-
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()


[2/4] git commit: Response to Matei's review

Posted by pw...@apache.org.
Response to Matei's review


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c58d4ea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c58d4ea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c58d4ea3

Branch: refs/heads/master
Commit: c58d4ea3d46ec7b72f7ced17d5c4193ee42befa0
Parents: 7101017
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 18:12:40 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 18:12:40 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 27 ++++++++++----------
 .../spark/api/java/JavaSparkContext.scala       | 16 ++++++------
 2 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c58d4ea3/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7ff06b5..566472e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -355,7 +355,7 @@ class SparkContext(
    * @param valueClass Class of the values
    * @param minSplits Minimum number of Hadoop Splits to generate.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -374,7 +374,7 @@ class SparkContext(
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
@@ -407,7 +407,7 @@ class SparkContext(
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
    * }}}
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -428,8 +428,9 @@ class SparkContext(
    * can just write, for example,
    * {{{
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
+   * }}}
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -453,7 +454,7 @@ class SparkContext(
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -474,7 +475,7 @@ class SparkContext(
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -489,12 +490,12 @@ class SparkContext(
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
-    * */
-  def sequenceFile[K: ClassTag, V: ClassTag](path: String,
+    */
+  def sequenceFile[K, V](path: String,
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int
@@ -505,12 +506,12 @@ class SparkContext(
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
     * */
-  def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V]
+  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
       ): RDD[(K, V)] =
     sequenceFile(path, keyClass, valueClass, defaultMinSplits)
 
@@ -530,7 +531,7 @@ class SparkContext(
    * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
    * allow it to figure out the Writable class to use in the subclass case.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -1058,7 +1059,7 @@ object SparkContext {
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
 
   implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
-      rdd: RDD[(K, V)]) =
+      rdd: RDD[(K, V)])   =
     new SequenceFileRDDFunctions(rdd)
 
   implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c58d4ea3/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 75b8e76..5a426b9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -139,7 +139,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
@@ -156,7 +156,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /** Get an RDD for a Hadoop SequenceFile.
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
@@ -197,7 +197,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
    * etc).
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -218,7 +218,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -236,7 +236,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
@@ -255,7 +255,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat
     *
-    * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD will create many references to the same object.
     * If you plan to directly cache Hadoop writable objects, you should first copy them using
     * a `map` function.
@@ -276,7 +276,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
@@ -296,7 +296,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    *
-   * Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.


[3/4] git commit: Minor changes after auditing diff from earlier version

Posted by pw...@apache.org.
Minor changes after auditing diff from earlier version


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/268ecbd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/268ecbd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/268ecbd2

Branch: refs/heads/master
Commit: 268ecbd2318d5dc44c483a44148688646b7ae416
Parents: c58d4ea
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 18:30:11 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 18:30:11 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala    | 3 ---
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 +---
 core/src/main/scala/org/apache/spark/util/Utils.scala       | 1 -
 3 files changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/268ecbd2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 72826ef..ad74d46 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,10 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
-import scala.reflect.ClassTag
-
 import org.apache.hadoop.conf.{Configuration, Configurable}
-import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/268ecbd2/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 955ba31..d1fff29 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -113,9 +113,7 @@ class NewHadoopRDD[K, V](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val key = reader.getCurrentKey
-        val value = reader.getCurrentValue
-        (key, value)
+        (reader.getCurrentKey, reader.getCurrentValue)
       }
 
       private def close() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/268ecbd2/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 64acfbd..61d8ef5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.reflect.ClassTag
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder


[4/4] git commit: Merge pull request #502 from pwendell/clone-1

Posted by pw...@apache.org.
Merge pull request #502 from pwendell/clone-1

Remove Hadoop object cloning and warn users making Hadoop RDD's.

The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in various file formats,
including Avro and another custom format.

This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c3196171
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c3196171
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c3196171

Branch: refs/heads/master
Commit: c3196171f3dffde6c9e67e3d35c398a01fbba846
Parents: cad3002 268ecbd
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 19:11:59 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 19:11:59 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 127 ++++++++------
 .../spark/api/java/JavaSparkContext.scala       | 165 ++++++-------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  28 +---
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  24 +--
 .../scala/org/apache/spark/util/Utils.scala     |  22 ---
 5 files changed, 137 insertions(+), 229 deletions(-)
----------------------------------------------------------------------