You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/09/10 21:40:29 UTC

[15/50] git commit: Add better docs for coalesce.

Add better docs for coalesce.

Include the useful tip that if shuffle=true, coalesce can actually
increase the number of partitions.

This makes coalesce more like a generic `RDD.repartition` operation.

(Ideally this `RDD.repartition` could automatically choose either a coalesce or
a shuffle if numPartitions was either less than or greater than, respectively,
the current number of partitions.)


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/df5fd352
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/df5fd352
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/df5fd352

Branch: refs/heads/branch-0.8
Commit: df5fd352735005ce0322d287ae27d72d12a7fc8e
Parents: 04cfb3a
Author: Stephen Haberman <st...@exigencecorp.com>
Authored: Sun Sep 8 15:39:03 2013 -0500
Committer: Stephen Haberman <st...@exigencecorp.com>
Committed: Sun Sep 8 15:39:04 2013 -0500

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 17 +++++++++++++++++
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 14 ++++++++++----
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/df5fd352/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e143ecd..41a90f1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -267,6 +267,23 @@ abstract class RDD[T: ClassManifest](
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
+   *
+   * This results in a narrow dependency, e.g. if you go from 1000 partitions
+   * to 100 partitions, there will not be a shuffle, instead each of the 100
+   * new partitions will claim 10 of the current partitions.
+   *
+   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+   * this may result in your computation taking place on fewer nodes than
+   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+   * you can pass shuffle = true. This will add a shuffle step, but means the
+   * current upstream partitions will be executed in parallel (per whatever
+   * the current partitioning is).
+   *
+   * Note: With shuffle = true, you can actually coalesce to a larger number
+   * of partitions. This is useful if you have a small number of partitions,
+   * say 100, potentially with a few partitions being abnormally large. Calling
+   * coalecse(1000, shuffle = true) will result in 1000 partitions with the
+   * data evenly distributed into each partition.
    */
   def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
     if (shuffle) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/df5fd352/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index adc9710..6096149 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(rdd.union(emptyKv).collect().size === 2)
   }
 
-  test("cogrouped RDDs") {
+  test("coalesced RDDs") {
     val data = sc.parallelize(1 to 10, 10)
 
     val coalesced1 = data.coalesce(2)
@@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val coalesced5 = data.coalesce(1, shuffle = true)
     assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
       null)
+
+    // when shuffling, we can increase the number of partitions
+    val coalesced6 = data.coalesce(20, shuffle = true)
+    assert(coalesced6.partitions.size === 20)
+    assert(coalesced6.collect().toList === (1 to 10).toList)
   }
-  test("cogrouped RDDs with locality") {
+
+  test("coalesced RDDs with locality") {
     val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
     val coal3 = data3.coalesce(3)
     val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
@@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val coalesced4 = data.coalesce(20)
     val listOfLists = coalesced4.glom().collect().map(_.toList).toList
     val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
-    assert( sortedList === (1 to 9).
+    assert(sortedList === (1 to 9).
       map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
   }
 
-  test("cogrouped RDDs with locality, large scale (10K partitions)") {
+  test("coalesced RDDs with locality, large scale (10K partitions)") {
     // large scale experiment
     import collection.mutable
     val rnd = scala.util.Random