You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/18 01:36:32 UTC

git commit: [SPARK-2534] Avoid pulling in the entire RDD in various operators (branch-1.0 backport)

Repository: spark
Updated Branches:
  refs/heads/branch-1.0 3bb5d2f8a -> 26c428acb


[SPARK-2534] Avoid pulling in the entire RDD in various operators (branch-1.0 backport)

This backports #1450 into branch-1.0.

Author: Reynold Xin <rx...@apache.org>

Closes #1469 from rxin/closure-1.0 and squashes the following commits:

b474a92 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in various operators


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26c428ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26c428ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26c428ac

Branch: refs/heads/branch-1.0
Commit: 26c428acb7049d683a9879d8380ef4ebf03923b9
Parents: 3bb5d2f
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jul 17 16:33:30 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jul 17 16:33:30 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 45 ++++++++++----------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 12 +++---
 2 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26c428ac/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0d3793d..8ad9344 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -132,7 +132,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
     // When deserializing, use a lazy val to create just one instance of the serializer per task
     lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
-    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
 
     combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
   }
@@ -175,22 +175,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("reduceByKeyLocally() does not support array keys")
     }
 
-    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+    val reducePartition = (iter: Iterator[(K, V)]) => {
       val map = new JHashMap[K, V]
       iter.foreach { case (k, v) =>
         val old = map.get(k)
         map.put(k, if (old == null) v else func(old, v))
       }
       Iterator(map)
-    }
+    } : Iterator[JHashMap[K, V]]
 
-    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+    val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
       m2.foreach { case (k, v) =>
         val old = m1.get(k)
         m1.put(k, if (old == null) v else func(old, v))
       }
       m1
-    }
+    } : JHashMap[K, V]
 
     self.mapPartitions(reducePartition).reduce(mergeMaps)
   }
@@ -273,11 +273,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+    val createCombiner = (v: V) => ArrayBuffer(v)
+    val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+    val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
+
     val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
     bufs.mapValues(_.toIterable)
   }
 
@@ -571,14 +572,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     self.partitioner match {
       case Some(p) =>
         val index = p.getPartition(key)
-        def process(it: Iterator[(K, V)]): Seq[V] = {
+        val process = (it: Iterator[(K, V)]) => {
           val buf = new ArrayBuffer[V]
           for ((k, v) <- it if k == key) {
             buf += v
           }
           buf
-        }
-        val res = self.context.runJob(self, process _, Array(index), false)
+        } : Seq[V]
+        val res = self.context.runJob(self, process, Array(index), false)
         res(0)
       case None =>
         self.filter(_._1 == key).map(_._2).collect()
@@ -695,7 +696,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       jobFormat.checkOutputSpecs(job)
     }
 
-    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+    val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -716,19 +717,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           val (k, v) = iter.next()
           writer.write(k, v)
         }
-      }
-      finally {
+      } finally {
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
-      return 1
-    }
+      1
+    } : Int
 
     val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
     val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
-    self.context.runJob(self, writeShard _)
+    self.context.runJob(self, writeShard)
     jobCommitter.commitJob(jobTaskContext)
   }
 
@@ -766,7 +766,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val writer = new SparkHadoopWriter(conf)
     writer.preSetup()
 
-    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+    val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -775,19 +775,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       writer.open()
       try {
         var count = 0
-        while(iter.hasNext) {
+        while (iter.hasNext) {
           val record = iter.next()
           count += 1
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
         }
-      }
-      finally {
+      } finally {
         writer.close()
       }
       writer.commit()
     }
 
-    self.context.runJob(self, writeToFile _)
+    self.context.runJob(self, writeToFile)
     writer.commitJob()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26c428ac/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e036c53..da2dc58 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -329,7 +329,7 @@ abstract class RDD[T: ClassTag](
       : RDD[T] = {
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from a random partition. */
-      def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+      val distributePartition = (index: Int, items: Iterator[T]) => {
         var position = (new Random(index)).nextInt(numPartitions)
         items.map { t =>
           // Note that the hash code of the key will just be the key itself. The HashPartitioner 
@@ -337,7 +337,7 @@ abstract class RDD[T: ClassTag](
           position = position + 1
           (position, t)
         }
-      }
+      } : Iterator[(Int, T)]
 
       // include a shuffle step so that our upstream tasks are still distributed
       new CoalescedRDD(
@@ -886,19 +886,19 @@ abstract class RDD[T: ClassTag](
       throw new SparkException("countByValue() does not support arrays")
     }
     // TODO: This should perhaps be distributed by default.
-    def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+    val countPartition = (iter: Iterator[T]) => {
       val map = new OpenHashMap[T,Long]
       iter.foreach {
         t => map.changeValue(t, 1L, _ + 1L)
       }
       Iterator(map)
-    }
-    def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+    } : Iterator[OpenHashMap[T,Long]]
+    val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
       m2.foreach { case (key, value) =>
         m1.changeValue(key, value, _ + value)
       }
       m1
-    }
+    } : OpenHashMap[T,Long]
     val myResult = mapPartitions(countPartition).reduce(mergeMaps)
     // Convert to a Scala mutable map
     val mutableResult = scala.collection.mutable.Map[T,Long]()