You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/17 10:01:19 UTC

git commit: [SPARK-2412] CoalescedRDD throws exception with certain pref locs

Repository: spark
Updated Branches:
  refs/heads/master 9c249743e -> 7c23c0dc3


[SPARK-2412] CoalescedRDD throws exception with certain pref locs

If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid.

The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist.

Author: Aaron Davidson <aa...@databricks.com>

Closes #1337 from aarondav/2412 and squashes the following commits:

f587b5d [Aaron Davidson] getOrElseUpdate
3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c23c0dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c23c0dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c23c0dc

Branch: refs/heads/master
Commit: 7c23c0dc3ed721c95690fc49f435d9de6952523c
Parents: 9c24974
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Jul 17 01:01:14 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jul 17 01:01:14 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala     |  4 ++--
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala    | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c23c0dc/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c45b759..e7221e3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
         val pgroup = PartitionGroup(nxt_replica)
         groupArr += pgroup
         addPartToPGroup(nxt_part, pgroup)
-        groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
         numCreated += 1
       }
     }
@@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
       var (nxt_replica, nxt_part) = rotIt.next()
       val pgroup = PartitionGroup(nxt_replica)
       groupArr += pgroup
-      groupHash.get(nxt_replica).get += pgroup
+      groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
       var tries = 0
       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
         nxt_part = rotIt.next()._2

http://git-wip-us.apache.org/repos/asf/spark/blob/7c23c0dc/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6ea0451..2924de1 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
+  test("coalesced RDDs with locality, fail first pass") {
+    val initialPartitions = 1000
+    val targetLen = 50
+    val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
+
+    val blocks = (1 to initialPartitions).map { i =>
+      (i, List(if (i > couponCount) "m2" else "m1"))
+    }
+    val data = sc.makeRDD(blocks)
+    val coalesced = data.coalesce(targetLen)
+    assert(coalesced.partitions.length == targetLen)
+  }
+
   test("zipped RDDs") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val zipped = nums.zip(nums.map(_ + 1.0))