You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/12 06:53:28 UTC
[1/3] git commit: Minor update for clone writables and more
documentation.
Updated Branches:
refs/heads/master dbc11df41 -> 288a87899
Minor update for clone writables and more documentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b0fbfcca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b0fbfcca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b0fbfcca
Branch: refs/heads/master
Commit: b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6
Parents: ee6e7f9
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jan 11 12:35:10 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jan 11 12:35:10 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 17 ++++++++++++++---
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 16 ++++++++++------
.../org/apache/spark/rdd/NewHadoopRDD.scala | 20 +++++++++++++++++---
3 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d7e681d..1656461 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -345,9 +345,20 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
+ * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+ * using the older MapReduce API (`org.apache.hadoop.mapred`).
+ *
+ * @param conf JobConf for setting up the dataset
+ * @param inputFormatClass Class of the [[InputFormat]]
+ * @param keyClass Class of the keys
+ * @param valueClass Class of the values
+ * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2da4611..9a220d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -45,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
val inputSplit = new SerializableWritable[InputSplit](s)
- override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
override val index: Int = idx
}
/**
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3).
+ * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
@@ -64,6 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
@@ -165,9 +170,9 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
- val keyCloneFunc = cloneWritables[K](getConf)
+ val keyCloneFunc = cloneWritables[K](jobConf)
val value: V = reader.createValue()
- val valueCloneFunc = cloneWritables[V](getConf)
+ val valueCloneFunc = cloneWritables[V](jobConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -176,8 +181,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
finished = true
}
if (cloneKeyValues) {
- (keyCloneFunc(key.asInstanceOf[Writable]),
- valueCloneFunc(value.asInstanceOf[Writable]))
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a347864..2f2d011 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -36,9 +36,24 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
val serializableHadoopSplit = new SerializableWritable(rawSplit)
- override def hashCode(): Int = (41 * (41 + rddId) + index)
+ override def hashCode(): Int = 41 * (41 + rddId) + index
}
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param conf The Hadoop configuration.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
+ */
class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -113,8 +128,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
val key = reader.getCurrentKey
val value = reader.getCurrentValue
if (cloneKeyValues) {
- (keyCloneFunc(key.asInstanceOf[Writable]),
- valueCloneFunc(value.asInstanceOf[Writable]))
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
[2/3] git commit: Renamed cloneKeyValues to cloneRecords;
updated docs.
Posted by rx...@apache.org.
Renamed cloneKeyValues to cloneRecords; updated docs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/362cda18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/362cda18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/362cda18
Branch: refs/heads/master
Commit: 362cda18bcb08f12ec680f10e190dd93418c998e
Parents: b0fbfcc
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jan 11 18:01:29 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jan 11 18:01:29 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 57 ++++++++++----------
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 18 +++----
.../org/apache/spark/rdd/NewHadoopRDD.scala | 14 ++---
3 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1656461..9a3d36b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -354,11 +354,11 @@ class SparkContext(
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
- * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
@@ -366,11 +366,11 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits,
- cloneKeyValues: Boolean = true
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -380,7 +380,7 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits,
- cloneKeyValues: Boolean = true
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -393,7 +393,7 @@ class SparkContext(
keyClass,
valueClass,
minSplits,
- cloneKeyValues)
+ cloneRecords)
}
/**
@@ -405,14 +405,14 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
- (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+ (path: String, minSplits: Int, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits,
- cloneKeyValues = cloneKeyValues)
+ cloneRecords)
}
/**
@@ -423,20 +423,20 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
+ hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
- (path: String, cloneKeyValues: Boolean = true)
+ (path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
- cloneKeyValues = cloneKeyValues)
+ cloneRecords = cloneRecords)
}
/**
@@ -449,11 +449,11 @@ class SparkContext(
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration,
- cloneKeyValues: Boolean = true): RDD[(K, V)] = {
+ cloneRecords: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
}
/**
@@ -465,8 +465,8 @@ class SparkContext(
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- cloneKeyValues: Boolean = true): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
+ cloneRecords: Boolean = true): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
@@ -474,16 +474,16 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int,
- cloneKeyValues: Boolean = true
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
- cloneKeyValues: Boolean = true): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
+ cloneRecords: Boolean = true): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -501,17 +501,18 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
- cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
- kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
+ def sequenceFile[K, V]
+ (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V],
+ kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
- writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+ writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 9a220d1..902083c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -64,11 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
- * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
@@ -78,7 +78,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int,
- cloneKeyValues: Boolean)
+ cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -88,7 +88,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int,
- cloneKeyValues: Boolean) = {
+ cloneRecords: Boolean) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -98,7 +98,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
keyClass,
valueClass,
minSplits,
- cloneKeyValues)
+ cloneRecords)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -180,7 +180,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
case eof: EOFException =>
finished = true
}
- if (cloneKeyValues) {
+ if (cloneRecords) {
(keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2f2d011..992bd4a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -48,11 +48,11 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param conf The Hadoop configuration.
- * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
@@ -60,7 +60,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
keyClass: Class[K],
valueClass: Class[V],
@transient conf: Configuration,
- cloneKeyValues: Boolean)
+ cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -127,7 +127,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
havePair = false
val key = reader.getCurrentKey
val value = reader.getCurrentValue
- if (cloneKeyValues) {
+ if (cloneRecords) {
(keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
[3/3] git commit: Merge pull request #389 from rxin/clone-writables
Posted by rx...@apache.org.
Merge pull request #389 from rxin/clone-writables
Minor update for clone writables and more documentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/288a8789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/288a8789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/288a8789
Branch: refs/heads/master
Commit: 288a878999848adb130041d1e40c14bfc879cec6
Parents: dbc11df 362cda1
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jan 11 21:53:19 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jan 11 21:53:19 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 64 ++++++++++++--------
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 24 +++++---
.../org/apache/spark/rdd/NewHadoopRDD.scala | 24 ++++++--
3 files changed, 71 insertions(+), 41 deletions(-)
----------------------------------------------------------------------