You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/17 04:01:09 UTC
spark git commit: [SPARK-10248][CORE] track exceptions in
dagscheduler event loop in tests
Repository: spark
Updated Branches:
refs/heads/master ce5fd4008 -> 38d9795a4
[SPARK-10248][CORE] track exceptions in dagscheduler event loop in tests
`DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception.
This was suggested by mateiz on https://github.com/apache/spark/pull/7699. It may have already turned up an issue in "zero split job".
Author: Imran Rashid <ir...@cloudera.com>
Closes #8466 from squito/SPARK-10248.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38d9795a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38d9795a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38d9795a
Branch: refs/heads/master
Commit: 38d9795a4fa07086d65ff705ce86648345618736
Parents: ce5fd40
Author: Imran Rashid <ir...@cloudera.com>
Authored: Wed Dec 16 19:01:05 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Dec 16 19:01:05 2015 -0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 5 ++--
.../spark/scheduler/DAGSchedulerSuite.scala | 28 ++++++++++++++++++--
2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8d0e0c8..b128ed5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -805,7 +805,8 @@ class DAGScheduler(
private[scheduler] def cleanUpAfterSchedulerStop() {
for (job <- activeJobs) {
- val error = new SparkException("Job cancelled because SparkContext was shut down")
+ val error =
+ new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
// Tell the listeners that all of the running stages have ended. Don't bother
// cancelling the stages because if the DAG scheduler is stopped, the entire application
@@ -1295,7 +1296,7 @@ class DAGScheduler(
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
- case other =>
+ case _: ExecutorLostFailure | TaskKilled | UnknownReason =>
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 653d41f..2869f0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -45,6 +45,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
case NonFatal(e) => onError(e)
}
}
+
+ override def onError(e: Throwable): Unit = {
+ logError("Error in DAGSchedulerEventLoop: ", e)
+ dagScheduler.stop()
+ throw e
+ }
+
}
/**
@@ -300,13 +307,18 @@ class DAGSchedulerSuite
test("zero split job") {
var numResults = 0
+ var failureReason: Option[Exception] = None
val fakeListener = new JobListener() {
- override def taskSucceeded(partition: Int, value: Any) = numResults += 1
- override def jobFailed(exception: Exception) = throw exception
+ override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1
+ override def jobFailed(exception: Exception): Unit = {
+ failureReason = Some(exception)
+ }
}
val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
assert(numResults === 0)
cancel(jobId)
+ assert(failureReason.isDefined)
+ assert(failureReason.get.getMessage() === "Job 0 cancelled ")
}
test("run trivial job") {
@@ -1675,6 +1687,18 @@ class DAGSchedulerSuite
assert(stackTraceString.contains("org.scalatest.FunSuite"))
}
+ test("catch errors in event loop") {
+ // this is a test of our testing framework -- make sure errors in event loop don't get ignored
+
+ // just run some bad event that will throw an exception -- we'll give a null TaskEndReason
+ val rdd1 = new MyRDD(sc, 1, Nil)
+ submit(rdd1, Array(0))
+ intercept[Exception] {
+ complete(taskSets(0), Seq(
+ (null, makeMapStatus("hostA", 1))))
+ }
+ }
+
test("simple map stage submission") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org