You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/18 01:36:32 UTC
git commit: [SPARK-2534] Avoid pulling in the entire RDD in various
operators (branch-1.0 backport)
Repository: spark
Updated Branches:
refs/heads/branch-1.0 3bb5d2f8a -> 26c428acb
[SPARK-2534] Avoid pulling in the entire RDD in various operators (branch-1.0 backport)
This backports #1450 into branch-1.0.
Author: Reynold Xin <rx...@apache.org>
Closes #1469 from rxin/closure-1.0 and squashes the following commits:
b474a92 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in various operators
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26c428ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26c428ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26c428ac
Branch: refs/heads/branch-1.0
Commit: 26c428acb7049d683a9879d8380ef4ebf03923b9
Parents: 3bb5d2f
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jul 17 16:33:30 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jul 17 16:33:30 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/rdd/PairRDDFunctions.scala | 45 ++++++++++----------
.../main/scala/org/apache/spark/rdd/RDD.scala | 12 +++---
2 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/26c428ac/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0d3793d..8ad9344 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -132,7 +132,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
@@ -175,22 +175,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
Iterator(map)
- }
+ } : Iterator[JHashMap[K, V]]
- def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+ val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
m1
- }
+ } : JHashMap[K, V]
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
@@ -273,11 +273,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+ val createCombiner = (v: V) => ArrayBuffer(v)
+ val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
+
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+ createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
}
@@ -571,14 +572,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
+ val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
}
buf
- }
- val res = self.context.runJob(self, process _, Array(index), false)
+ } : Seq[V]
+ val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
@@ -695,7 +696,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -716,19 +717,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (k, v) = iter.next()
writer.write(k, v)
}
- }
- finally {
+ } finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- return 1
- }
+ 1
+ } : Int
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard _)
+ self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}
@@ -766,7 +766,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -775,19 +775,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.open()
try {
var count = 0
- while(iter.hasNext) {
+ while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
- }
- finally {
+ } finally {
writer.close()
}
writer.commit()
}
- self.context.runJob(self, writeToFile _)
+ self.context.runJob(self, writeToFile)
writer.commitJob()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/26c428ac/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e036c53..da2dc58 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -329,7 +329,7 @@ abstract class RDD[T: ClassTag](
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
- def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
@@ -337,7 +337,7 @@ abstract class RDD[T: ClassTag](
position = position + 1
(position, t)
}
- }
+ } : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
@@ -886,19 +886,19 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+ val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
- }
- def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+ } : Iterator[OpenHashMap[T,Long]]
+ val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
- }
+ } : OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()