You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/02/25 01:59:03 UTC

[1/3] git commit: SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

Repository: incubator-spark
Updated Branches:
  refs/heads/master c0ef3afa8 -> d8d190efd


SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

In the previous code, if you had a failing map stage and then tried to
run reduce stages on it repeatedly, the first reduce stage would fail
correctly, but the later ones would mistakenly believe that all map
outputs are available and start failing infinitely with fetch failures
from "null".


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cd32d5e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cd32d5e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cd32d5e4

Branch: refs/heads/master
Commit: cd32d5e4dee1291e4509e5965322b7ffe620b1f3
Parents: c0ef3af
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sun Feb 23 23:45:48 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Feb 23 23:48:32 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 31 +++++++++++---------
 .../scala/org/apache/spark/FailureSuite.scala   | 13 ++++++++
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cd32d5e4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 729f518..789d5e6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -272,8 +272,10 @@ class DAGScheduler(
     if (mapOutputTracker.has(shuffleDep.shuffleId)) {
       val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
-      for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
-      stage.numAvailableOutputs = locs.size
+      for (i <- 0 until locs.size) {
+        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
+      }
+      stage.numAvailableOutputs = locs.count(_ != null)
     } else {
       // Kind of ugly: need to register RDDs with the cache and map output tracker here
       // since we can't do it in the RDD constructor because # of partitions is unknown
@@ -373,25 +375,26 @@ class DAGScheduler(
           } else {
             def removeStage(stageId: Int) {
               // data structures based on Stage
-              stageIdToStage.get(stageId).foreach { s =>
-                if (running.contains(s)) {
+              for (stage <- stageIdToStage.get(stageId)) {
+                if (running.contains(stage)) {
                   logDebug("Removing running stage %d".format(stageId))
-                  running -= s
+                  running -= stage
+                }
+                stageToInfos -= stage
+                for (shuffleDep <- stage.shuffleDep) {
+                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                 }
-                stageToInfos -= s
-                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
-                  shuffleToMapStage.remove(shuffleId))
-                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
+                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                   logDebug("Removing pending status for stage %d".format(stageId))
                 }
-                pendingTasks -= s
-                if (waiting.contains(s)) {
+                pendingTasks -= stage
+                if (waiting.contains(stage)) {
                   logDebug("Removing stage %d from waiting set.".format(stageId))
-                  waiting -= s
+                  waiting -= stage
                 }
-                if (failed.contains(s)) {
+                if (failed.contains(stage)) {
                   logDebug("Removing stage %d from failed set.".format(stageId))
-                  failed -= s
+                  failed -= stage
                 }
               }
               // data structures based on StageId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cd32d5e4/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index ac3c867..f3fb64d 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  // Run a map-reduce job in which the map stage always fails.
+  test("failure in a map stage") {
+    sc = new SparkContext("local", "test")
+    val data = sc.makeRDD(1 to 3).map(x => { throw new Exception; (x, x) }).groupByKey(3)
+    intercept[SparkException] {
+      data.collect()
+    }
+    // Make sure that running new jobs with the same map stage also fails
+    intercept[SparkException] {
+      data.collect()
+    }
+  }
+
   test("failure because task results are not serializable") {
     sc = new SparkContext("local[1,1]", "test")
     val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)


[2/3] git commit: Fix removal from shuffleToMapStage to search for a key-value pair with our stage instead of using our shuffleID.

Posted by ma...@apache.org.
Fix removal from shuffleToMapStage to search for a key-value pair with
our stage instead of using our shuffleID.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0187cef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0187cef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0187cef0

Branch: refs/heads/master
Commit: 0187cef0f284e6cb22cb3986c327c43304daf57d
Parents: cd32d5e
Author: Matei Zaharia <ma...@databricks.com>
Authored: Mon Feb 24 13:14:56 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Feb 24 13:14:56 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0187cef0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 789d5e6..dc5b25d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -381,8 +381,8 @@ class DAGScheduler(
                   running -= stage
                 }
                 stageToInfos -= stage
-                for (shuffleDep <- stage.shuffleDep) {
-                  shuffleToMapStage.remove(shuffleDep.shuffleId)
+                for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
+                  shuffleToMapStage.remove(k)
                 }
                 if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                   logDebug("Removing pending status for stage %d".format(stageId))


[3/3] git commit: Merge pull request #641 from mateiz/spark-1124-master

Posted by ma...@apache.org.
Merge pull request #641 from mateiz/spark-1124-master

SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

In the previous code, if you had a failing map stage and then tried to run reduce stages on it repeatedly, the first reduce stage would fail correctly, but the later ones would mistakenly believe that all map outputs are available and start failing infinitely with fetch failures from "null". See https://spark-project.atlassian.net/browse/SPARK-1124 for an example.

This PR also cleans up code style slightly where there was a variable named "s" and some weird map manipulation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d8d190ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d8d190ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d8d190ef

Branch: refs/heads/master
Commit: d8d190efd2d08c3894b20f6814b10f9ca2157309
Parents: c0ef3af 0187cef
Author: Matei Zaharia <ma...@databricks.com>
Authored: Mon Feb 24 16:58:57 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Feb 24 16:58:57 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 31 +++++++++++---------
 .../scala/org/apache/spark/FailureSuite.scala   | 13 ++++++++
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------