You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/17 21:08:55 UTC

git commit: [SPARK-2534] Avoid pulling in the entire RDD in various operators

Repository: spark
Updated Branches:
  refs/heads/master 9c73822a0 -> d988d345d


[SPARK-2534] Avoid pulling in the entire RDD in various operators

This should go into both master and branch-1.0.

Author: Reynold Xin <rx...@apache.org>

Closes #1450 from rxin/agg-closure and squashes the following commits:

e40f363 [Reynold Xin] Mima check excludes.
9186364 [Reynold Xin] Define the return type more explicitly.
38e348b [Reynold Xin] Fixed the cases in RDD.scala.
ea6b34d [Reynold Xin] Blah
89b9c43 [Reynold Xin] Fix other instances of accidentally pulling in extra stuff in closures.
73b2783 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in groupByKey.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d988d345
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d988d345
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d988d345

Branch: refs/heads/master
Commit: d988d345d5bec0668324386f3e81787f78e75e67
Parents: 9c73822
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jul 17 10:54:53 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jul 17 10:54:53 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  46 +++---
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  12 +-
 project/MimaExcludes.scala                      | 159 ++++++++++---------
 3 files changed, 115 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d988d345/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 9d62d53..29038b0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     zeroBuffer.get(zeroArray)
 
     lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
-    def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
+    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
 
     combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
   }
@@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
     // When deserializing, use a lazy val to create just one instance of the serializer per task
     lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
-    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
 
     combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
   }
@@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("reduceByKeyLocally() does not support array keys")
     }
 
-    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+    val reducePartition = (iter: Iterator[(K, V)]) => {
       val map = new JHashMap[K, V]
       iter.foreach { case (k, v) =>
         val old = map.get(k)
         map.put(k, if (old == null) v else func(old, v))
       }
       Iterator(map)
-    }
+    } : Iterator[JHashMap[K, V]]
 
-    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+    val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
       m2.foreach { case (k, v) =>
         val old = m1.get(k)
         m1.put(k, if (old == null) v else func(old, v))
       }
       m1
-    }
+    } : JHashMap[K, V]
 
     self.mapPartitions(reducePartition).reduce(mergeMaps)
   }
@@ -361,11 +361,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+    val createCombiner = (v: V) => ArrayBuffer(v)
+    val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+    val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
     val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
     bufs.mapValues(_.toIterable)
   }
 
@@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     self.partitioner match {
       case Some(p) =>
         val index = p.getPartition(key)
-        def process(it: Iterator[(K, V)]): Seq[V] = {
+        val process = (it: Iterator[(K, V)]) => {
           val buf = new ArrayBuffer[V]
           for ((k, v) <- it if k == key) {
             buf += v
           }
           buf
-        }
-        val res = self.context.runJob(self, process _, Array(index), false)
+        } : Seq[V]
+        val res = self.context.runJob(self, process, Array(index), false)
         res(0)
       case None =>
         self.filter(_._1 == key).map(_._2).collect()
@@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       jobFormat.checkOutputSpecs(job)
     }
 
-    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+    val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -861,19 +861,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           val (k, v) = iter.next()
           writer.write(k, v)
         }
-      }
-      finally {
+      } finally {
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
-      return 1
-    }
+      1
+    } : Int
 
     val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
     val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
-    self.context.runJob(self, writeShard _)
+    self.context.runJob(self, writeShard)
     jobCommitter.commitJob(jobTaskContext)
   }
 
@@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val writer = new SparkHadoopWriter(hadoopConf)
     writer.preSetup()
 
-    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+    val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       writer.open()
       try {
         var count = 0
-        while(iter.hasNext) {
+        while (iter.hasNext) {
           val record = iter.next()
           count += 1
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
         }
-      }
-      finally {
+      } finally {
         writer.close()
       }
       writer.commit()
     }
 
-    self.context.runJob(self, writeToFile _)
+    self.context.runJob(self, writeToFile)
     writer.commitJob()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d988d345/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a25f263..88a918a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -328,7 +328,7 @@ abstract class RDD[T: ClassTag](
       : RDD[T] = {
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from a random partition. */
-      def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+      val distributePartition = (index: Int, items: Iterator[T]) => {
         var position = (new Random(index)).nextInt(numPartitions)
         items.map { t =>
           // Note that the hash code of the key will just be the key itself. The HashPartitioner 
@@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag](
           position = position + 1
           (position, t)
         }
-      }
+      } : Iterator[(Int, T)]
 
       // include a shuffle step so that our upstream tasks are still distributed
       new CoalescedRDD(
@@ -919,19 +919,19 @@ abstract class RDD[T: ClassTag](
       throw new SparkException("countByValue() does not support arrays")
     }
     // TODO: This should perhaps be distributed by default.
-    def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+    val countPartition = (iter: Iterator[T]) => {
       val map = new OpenHashMap[T,Long]
       iter.foreach {
         t => map.changeValue(t, 1L, _ + 1L)
       }
       Iterator(map)
-    }
-    def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+    }: Iterator[OpenHashMap[T,Long]]
+    val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
       m2.foreach { case (key, value) =>
         m1.changeValue(key, value, _ + value)
       }
       m1
-    }
+    }: OpenHashMap[T,Long]
     val myResult = mapPartitions(countPartition).reduce(mergeMaps)
     // Convert to a Scala mutable map
     val mutableResult = scala.collection.mutable.Map[T,Long]()

http://git-wip-us.apache.org/repos/asf/spark/blob/d988d345/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d67c657..3487f7c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,76 +31,91 @@ import com.typesafe.tools.mima.core._
  * MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
  */
 object MimaExcludes {
-    def excludes(version: String) =
-      version match {
-        case v if v.startsWith("1.1") =>
-          Seq(
-            MimaBuild.excludeSparkPackage("deploy"),
-            MimaBuild.excludeSparkPackage("graphx")
-          ) ++
-          Seq(
-            // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
-            ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
-            // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
-            // for countApproxDistinct* functions, which does not work in Java. We later removed
-            // them, and use the following to tell Mima to not care about them.
-            ProblemFilters.exclude[IncompatibleResultTypeProblem](
-              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
-            ProblemFilters.exclude[IncompatibleResultTypeProblem](
-              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.storage.MemoryStore.Entry"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
-                + "createZero$1")
-          ) ++
-          Seq(
-            ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
-          ) ++
-          Seq( // Ignore some private methods in ALS.
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
-            ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
-              "org.apache.spark.mllib.recommendation.ALS.this"),
-            ProblemFilters.exclude[MissingMethodProblem](
-              "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
-          ) ++
-          MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
-          MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
-          MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
-          MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
-          MimaBuild.excludeSparkClass("storage.Values") ++
-          MimaBuild.excludeSparkClass("storage.Entry") ++
-          MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
-        case v if v.startsWith("1.0") =>
-          Seq(
-            MimaBuild.excludeSparkPackage("api.java"),
-            MimaBuild.excludeSparkPackage("mllib"),
-            MimaBuild.excludeSparkPackage("streaming")
-          ) ++
-          MimaBuild.excludeSparkClass("rdd.ClassTags") ++
-          MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
-          MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
-          MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
-          MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
-          MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
-          MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
-          MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
-          MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
-          MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
-          MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
-          MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
-          MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
-        case _ => Seq()
-      }
+
+  def excludes(version: String) = version match {
+    case v if v.startsWith("1.1") =>
+      Seq(
+        MimaBuild.excludeSparkPackage("deploy"),
+        MimaBuild.excludeSparkPackage("graphx")
+      ) ++
+      closures.map(method => ProblemFilters.exclude[MissingMethodProblem](method)) ++
+      Seq(
+        // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+        // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
+        // for countApproxDistinct* functions, which does not work in Java. We later removed
+        // them, and use the following to tell Mima to not care about them.
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.storage.MemoryStore.Entry"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+            + "createZero$1")
+      ) ++
+      Seq(
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
+      ) ++
+      Seq( // Ignore some private methods in ALS.
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+        ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
+          "org.apache.spark.mllib.recommendation.ALS.this"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
+      ) ++
+      MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
+      MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
+      MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
+      MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
+      MimaBuild.excludeSparkClass("storage.Values") ++
+      MimaBuild.excludeSparkClass("storage.Entry") ++
+      MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
+    case v if v.startsWith("1.0") =>
+      Seq(
+        MimaBuild.excludeSparkPackage("api.java"),
+        MimaBuild.excludeSparkPackage("mllib"),
+        MimaBuild.excludeSparkPackage("streaming")
+      ) ++
+      MimaBuild.excludeSparkClass("rdd.ClassTags") ++
+      MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
+      MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
+      MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
+      MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
+      MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
+      MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
+      MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
+      MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
+      MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
+      MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
+      MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
+      MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
+    case _ => Seq()
+  }
+
+  private val closures = Seq(
+    "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$mergeMaps$1",
+    "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1",
+    "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$distributePartition$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeValue$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeToFile$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$reducePartition$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeShard$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeCombiners$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$process$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$createCombiner$1",
+    "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeMaps$1"
+  )
 }