You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/26 00:15:55 UTC

git commit: [SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI.

Repository: spark
Updated Branches:
  refs/heads/master 47b6b38ca -> 37ad3b724


[SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI.

Due to problems with when we update runningStages (in DAGScheduler.scala)
and how we decide to send a SparkListenerStageCompleted message to
SparkListeners, sometimes stages can be shown as "running" in the UI forever
(even after they have failed).  This issue can manifest when stages are
resubmitted with 0 tasks, or when the DAGScheduler catches non-serializable
tasks. The problem also resulted in a (small) memory leak in the DAGScheduler,
where stages can stay in runningStages forever. This commit fixes
that problem and adds a unit test.

Thanks tsudukim for helping to look into this issue!

cc markhamstra rxin

Author: Kay Ousterhout <ka...@gmail.com>

Closes #1566 from kayousterhout/dag_fix and squashes the following commits:

217d74b [Kay Ousterhout] [SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37ad3b72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37ad3b72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37ad3b72

Branch: refs/heads/master
Commit: 37ad3b724590dcf42bcdbfaf91b7a11914501945
Parents: 47b6b38
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Jul 25 15:14:13 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Jul 25 15:14:13 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  12 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 129 ++++++++++---------
 2 files changed, 76 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37ad3b72/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index acb4c49..00b8af2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -710,7 +710,6 @@ class DAGScheduler(
         if (missing == Nil) {
           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
           submitMissingTasks(stage, jobId.get)
-          runningStages += stage
         } else {
           for (parent <- missing) {
             submitStage(parent)
@@ -753,11 +752,14 @@ class DAGScheduler(
       null
     }
 
-    // must be run listener before possible NotSerializableException
-    // should be "StageSubmitted" first and then "JobEnded"
-    listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
-
     if (tasks.size > 0) {
+      runningStages += stage
+      // SparkListenerStageSubmitted should be posted before testing whether tasks are
+      // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
+      // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
+      // event.
+      listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
+
       // Preemptively serialize a task to make sure it can be serialized. We are catching this
       // exception here because it would be fairly hard to catch the non-serializable exception
       // down the road, where we have several different implementations for local scheduler and

http://git-wip-us.apache.org/repos/asf/spark/blob/37ad3b72/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9f498d5..44dd1e0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -37,6 +37,29 @@ class BuggyDAGEventProcessActor extends Actor {
   }
 }
 
+/**
+ * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
+ * preferredLocations (if any) that are passed to them. They are deliberately not executable
+ * so we can test that DAGScheduler does not try to execute RDDs locally.
+ */
+class MyRDD(
+    sc: SparkContext,
+    numPartitions: Int,
+    dependencies: List[Dependency[_]],
+    locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
+  override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+    throw new RuntimeException("should not be reached")
+  override def getPartitions = (0 until numPartitions).map(i => new Partition {
+    override def index = i
+  }).toArray
+  override def getPreferredLocations(split: Partition): Seq[String] =
+    if (locations.isDefinedAt(split.index))
+      locations(split.index)
+    else
+      Nil
+  override def toString: String = "DAGSchedulerSuiteRDD " + id
+}
+
 class DAGSchedulerSuiteDummyException extends Exception
 
 class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
@@ -148,34 +171,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
    * Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
    * This is a pair RDD type so it can always be used in ShuffleDependencies.
    */
-  type MyRDD = RDD[(Int, Int)]
-
-  /**
-   * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
-   * preferredLocations (if any) that are passed to them. They are deliberately not executable
-   * so we can test that DAGScheduler does not try to execute RDDs locally.
-   */
-  private def makeRdd(
-        numPartitions: Int,
-        dependencies: List[Dependency[_]],
-        locations: Seq[Seq[String]] = Nil
-      ): MyRDD = {
-    val maxPartition = numPartitions - 1
-    val newRDD = new MyRDD(sc, dependencies) {
-      override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
-        throw new RuntimeException("should not be reached")
-      override def getPartitions = (0 to maxPartition).map(i => new Partition {
-        override def index = i
-      }).toArray
-      override def getPreferredLocations(split: Partition): Seq[String] =
-        if (locations.isDefinedAt(split.index))
-          locations(split.index)
-        else
-          Nil
-      override def toString: String = "DAGSchedulerSuiteRDD " + id
-    }
-    newRDD
-  }
+  type PairOfIntsRDD = RDD[(Int, Int)]
 
   /**
    * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
@@ -234,19 +230,19 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
       override def taskSucceeded(partition: Int, value: Any) = numResults += 1
       override def jobFailed(exception: Exception) = throw exception
     }
-    submit(makeRdd(0, Nil), Array(), listener = fakeListener)
+    submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
     assert(numResults === 0)
   }
 
   test("run trivial job") {
-    submit(makeRdd(1, Nil), Array(0))
+    submit(new MyRDD(sc, 1, Nil), Array(0))
     complete(taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
   }
 
   test("local job") {
-    val rdd = new MyRDD(sc, Nil) {
+    val rdd = new PairOfIntsRDD(sc, Nil) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         Array(42 -> 0).iterator
       override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -260,7 +256,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("local job oom") {
-    val rdd = new MyRDD(sc, Nil) {
+    val rdd = new PairOfIntsRDD(sc, Nil) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         throw new java.lang.OutOfMemoryError("test local job oom")
       override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -274,8 +270,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("run trivial job w/ dependency") {
-    val baseRdd = makeRdd(1, Nil)
-    val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
+    val baseRdd = new MyRDD(sc, 1, Nil)
+    val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
     submit(finalRdd, Array(0))
     complete(taskSets(0), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
@@ -283,8 +279,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("cache location preferences w/ dependency") {
-    val baseRdd = makeRdd(1, Nil)
-    val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
+    val baseRdd = new MyRDD(sc, 1, Nil)
+    val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
     cacheLocations(baseRdd.id -> 0) =
       Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
     submit(finalRdd, Array(0))
@@ -295,8 +291,22 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("unserializable task") {
+    val unserializableRdd = new MyRDD(sc, 1, Nil) {
+      class UnserializableClass
+      val unserializable = new UnserializableClass
+    }
+    submit(unserializableRdd, Array(0))
+    assert(failure.getMessage.startsWith(
+      "Job aborted due to stage failure: Task not serializable:"))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.contains(0))
+    assert(sparkListener.failedStages.size === 1)
+    assertDataStructuresEmpty
+  }
+
   test("trivial job failure") {
-    submit(makeRdd(1, Nil), Array(0))
+    submit(new MyRDD(sc, 1, Nil), Array(0))
     failed(taskSets(0), "some failure")
     assert(failure.getMessage === "Job aborted due to stage failure: some failure")
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
@@ -306,7 +316,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("trivial job cancellation") {
-    val rdd = makeRdd(1, Nil)
+    val rdd = new MyRDD(sc, 1, Nil)
     val jobId = submit(rdd, Array(0))
     cancel(jobId)
     assert(failure.getMessage === s"Job $jobId cancelled ")
@@ -347,8 +357,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     }
     dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
       Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
-    val rdd = makeRdd(1, Nil)
-    val jobId = submit(rdd, Array(0))
+    val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
     cancel(jobId)
     // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
     assert(failure === null)
@@ -364,10 +373,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("run trivial shuffle") {
-    val shuffleMapRdd = makeRdd(2, Nil)
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
     val shuffleId = shuffleDep.shuffleId
-    val reduceRdd = makeRdd(1, List(shuffleDep))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
     submit(reduceRdd, Array(0))
     complete(taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 1)),
@@ -380,10 +389,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("run trivial shuffle with fetch failure") {
-    val shuffleMapRdd = makeRdd(2, Nil)
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
     val shuffleId = shuffleDep.shuffleId
-    val reduceRdd = makeRdd(2, List(shuffleDep))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 1)),
@@ -406,10 +415,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("ignore late map task completions") {
-    val shuffleMapRdd = makeRdd(2, Nil)
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
     val shuffleId = shuffleDep.shuffleId
-    val reduceRdd = makeRdd(2, List(shuffleDep))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     // pretend we were told hostA went away
     val oldEpoch = mapOutputTracker.getEpoch
@@ -435,9 +444,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("run shuffle with map stage failure") {
-    val shuffleMapRdd = makeRdd(2, Nil)
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
-    val reduceRdd = makeRdd(2, List(shuffleDep))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
 
     // Fail the map stage.  This should cause the entire job to fail.
@@ -472,13 +481,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
    * without shuffleMapRdd1.
    */
   test("failure of stage used by two jobs") {
-    val shuffleMapRdd1 = makeRdd(2, Nil)
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
     val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null)
-    val shuffleMapRdd2 = makeRdd(2, Nil)
+    val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
     val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null)
 
-    val reduceRdd1 = makeRdd(2, List(shuffleDep1))
-    val reduceRdd2 = makeRdd(2, List(shuffleDep1, shuffleDep2))
+    val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1))
+    val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2))
 
     // We need to make our own listeners for this test, since by default submit uses the same
     // listener for all jobs, and here we want to capture the failure for each job separately.
@@ -511,10 +520,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("run trivial shuffle with out-of-band failure and retry") {
-    val shuffleMapRdd = makeRdd(2, Nil)
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
     val shuffleId = shuffleDep.shuffleId
-    val reduceRdd = makeRdd(1, List(shuffleDep))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
     submit(reduceRdd, Array(0))
     // blockManagerMaster.removeExecutor("exec-hostA")
     // pretend we were told hostA went away
@@ -534,11 +543,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("recursive shuffle failures") {
-    val shuffleOneRdd = makeRdd(2, Nil)
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil)
     val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
-    val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
     val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
-    val finalRdd = makeRdd(1, List(shuffleDepTwo))
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
     submit(finalRdd, Array(0))
     // have the first stage complete normally
     complete(taskSets(0), Seq(
@@ -563,11 +572,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   test("cached post-shuffle") {
-    val shuffleOneRdd = makeRdd(2, Nil)
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil)
     val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
-    val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
     val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
-    val finalRdd = makeRdd(1, List(shuffleDepTwo))
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
     submit(finalRdd, Array(0))
     cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
     cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))