You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/07/17 02:31:32 UTC
[35/50] [abbrv] git commit: Throw a more meaningful message when
runJob is called to launch tasks on non-existent partitions.
Throw a more meaningful message when runJob is called to launch tasks on non-existent partitions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/69316603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/69316603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/69316603
Branch: refs/heads/master
Commit: 69316603d6bf11ecf1ea3dab63df178bad835e2d
Parents: ed8415b
Author: Reynold Xin <re...@gmail.com>
Authored: Mon Jul 15 22:50:11 2013 -0700
Committer: Reynold Xin <re...@gmail.com>
Committed: Mon Jul 15 22:50:11 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/spark/scheduler/DAGScheduler.scala | 9 +++++++++
core/src/test/scala/spark/RDDSuite.scala | 6 ++++++
2 files changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/69316603/core/src/main/scala/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 3d3b9ea..8173ef7 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -251,6 +251,15 @@ class DAGScheduler(
if (partitions.size == 0) {
return
}
+
+ // Check to make sure we are not launching a task on a partition that does not exist.
+ val maxPartitions = finalRdd.partitions.length
+ partitions.find(p => p >= maxPartitions).foreach { p =>
+ throw new IllegalArgumentException(
+ "Attempting to access a non-existent partition: " + p + ". " +
+ "Total number of partitions: " + maxPartitions)
+ }
+
val (toSubmit, waiter) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/69316603/core/src/test/scala/spark/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index aa3ee5f..7f7d4c8 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
}
}
+
+ test("runJob on an invalid partition") {
+ intercept[IllegalArgumentException] {
+ sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
+ }
+ }
}