You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/12 21:00:48 UTC

git commit: Fix #SPARK-1149 Bad partitioners can cause Spark to hang

Repository: spark
Updated Branches:
  refs/heads/master b5162f442 -> 5d1ec64e7


Fix #SPARK-1149 Bad partitioners can cause Spark to hang

Author: liguoqiang <li...@rd.tuan800.com>

Closes #44 from witgo/SPARK-1149 and squashes the following commits:

3dcdcaf [liguoqiang] Merge branch 'master' into SPARK-1149
8425395 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
3dad595 [liguoqiang] review comment
e3e56aa [liguoqiang] Merge branch 'master' into SPARK-1149
b0d5c07 [liguoqiang] review comment
d0a6005 [liguoqiang] review comment
3395ee7 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
ac006a3 [liguoqiang] code Formatting
3feb3a8 [liguoqiang] Merge branch 'master' into SPARK-1149
adc443e [liguoqiang] partitions check  bugfix
928e1e3 [liguoqiang] Added a unit test for PairRDDFunctions.lookup with bad partitioner
db6ecc5 [liguoqiang] Merge branch 'master' into SPARK-1149
1e3331e [liguoqiang] Merge branch 'master' into SPARK-1149
3348619 [liguoqiang] Optimize performance for partitions check
61e5a87 [liguoqiang] Merge branch 'master' into SPARK-1149
e68210a [liguoqiang] add partition index check to submitJob
3a65903 [liguoqiang] make the code more readable
6bb725e [liguoqiang] fix #SPARK-1149 Bad partitioners can cause Spark to hang


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d1ec64e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d1ec64e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d1ec64e

Branch: refs/heads/master
Commit: 5d1ec64e7934ad7f922cdab516fa5de690644780
Parents: b5162f4
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Wed Mar 12 12:59:51 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 12 13:00:04 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala  |  6 ++++++
 .../apache/spark/rdd/PairRDDFunctionsSuite.scala    | 16 ++++++++++++++++
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d1ec64e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 745e3fa..852ed8f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -852,6 +852,9 @@ class SparkContext(
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
+    partitions.foreach{ p =>
+      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+    }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
     logInfo("Starting job: " + callSite)
@@ -955,6 +958,9 @@ class SparkContext(
       resultHandler: (Int, U) => Unit,
       resultFunc: => R): SimpleFutureAction[R] =
   {
+    partitions.foreach{ p =>
+      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+    }
     val cleanF = clean(processPartition)
     val callSite = getCallSite
     val waiter = dagScheduler.submitJob(

http://git-wip-us.apache.org/repos/asf/spark/blob/5d1ec64e/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 85e8eb5..f9e994b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     assert(shuffled.lookup(5) === Seq(6,7))
     assert(shuffled.lookup(-1) === Seq())
   }
+
+  test("lookup with bad partitioner") {
+    val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
+
+    val p = new Partitioner {
+      def numPartitions: Int = 2
+
+      def getPartition(key: Any): Int = key.hashCode() % 2
+    }
+    val shuffled = pairs.partitionBy(p)
+
+    assert(shuffled.partitioner === Some(p))
+    assert(shuffled.lookup(1) === Seq(2))
+    intercept[IllegalArgumentException] {shuffled.lookup(-1)}
+  }
+
 }
 
 /*