You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2015/08/17 04:33:45 UTC

[jira] [Created] (SPARK-10042) Use consistent behavior for Internal Accumulators across stage retries

Imran Rashid created SPARK-10042:
------------------------------------

             Summary: Use consistent behavior for Internal Accumulators across stage retries
                 Key: SPARK-10042
                 URL: https://issues.apache.org/jira/browse/SPARK-10042
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, Web UI
    Affects Versions: 1.5.0
            Reporter: Imran Rashid


[~andrewor14]

The internal accumulators introduced in SPARK-8735 aren't counted in a consistent manner during stage retries.  Whether the accumulators are counted once or multiple times is very random.

First a little interlude on how stage failures & retry works.  When there is a fetch failure, spark looks at the block manager that it failed to fetch data from, and it assumes none of the data from that BM is available.  It fails the stage with the fetch failure, then it goes back to the ShuffleMapStage that produced the data.  It looks at which partitions were stored in the failed BM, and it reruns just those partitions.  Meanwhile, all currently running tasks for current stage keep running, potentially producing more fetch failures.  In fact, some of those tasks can even keep running until the dependent stage has been re-run, and this stage has been restarted.  (Yes, this can and does happen under real workloads, and is the cause of a SPARK-8029, a serious failure in real workloads.)

If Spark has lost multiple BMs (which might mean its lost all the shuffle map output of an earlier stage), there are a few different ways that shuffle map output will get regenerated.  Perhaps there will be enough tasks running to trigger fetch failures on all the lost BMs before the earlier stage is restarted, so by the time the stage is re-scheduled, the scheduler knows to rerun all the tasks.  Or maybe it only gets a failure on one block manager, so it re-generates the map output for that one block manager, and then on trying the downstream stage, it realizes another block manager is down, and repeats the process, one BM at a time, till everything has been regenerated.  Or perhaps as its regenerating the map output from the first failure, the "zombie" tasks from the failed stage that are still running trigger fetch failures from all the other block managers.  And then as soon as shuffle map stage is done regenerating data for one BM, it'll immediately regenerate the data for the other lost BMs before trying the downstream stage.  (And then there are assorted combinations as well.)

This means that it is totally unpredictable how many partitions will get rerun for the ShuffleMapStage that was previously successful.  Eg., run your example program:

{noformat}
import org.apache.spark._
import org.apache.spark.shuffle.FetchFailedException

val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle

// Simulate fetch failures
val mappedData = data.map { case (i, _) =>
  val taskContext = TaskContext.get
  if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
    // Cause the post-shuffle stage to fail on its first attempt with a single task failure
    val env = SparkEnv.get
    val bmAddress = env.blockManager.blockManagerId
    val shuffleId = shuffleHandle.shuffleId
    val mapId = 0
    val reduceId = taskContext.partitionId()
    val message = "Simulated fetch failure"
    throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message)
  } else {
    (i, i)
  }
}

mappedData.reduceByKey ({ _ + _ }, 500).count()

{noformat}


with the current condition on resetting the accumulators, that is {{(stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute)}}.  In {{local}} mode all partitions will get re-run.  Then try running it with {{local-cluster[2,1,1024]}} (which will create two block managers).  Here's some example debug output from when I ran it:

{noformat}
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values:
=== STAGE 0 IS CREATING NEW ACCUMULATORS ===
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values:
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6, 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...)
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 1, 2, 4
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 2, 3, 4
  - internal accum values: 3936
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11, 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 2
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 4
  - internal accum values: 7872
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15, 192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640), shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 3, 4
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values: 9840
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
...
{noformat}

As you can see, {{partitionsToCompute != allPartitions}} in most cases.  For example, in the second submission of stage 1, we would have **double counted** the accumulators for partitions 0,2,3,4.  By the third submission of stage 1, we would have **triple counted** partitions 0 & 4.  Or then again, we just might reset the values and count singly, as we do in the final iteration you see here.

I had earlier suggested that we should never reset the value, just initialize it once, and have the value keep increasing.  But maybe that isn't what you want -- maybe you want to *always* reset the value?  Then the metrics would clearly apply to that one stage *attempt* alone.  In any case, we are stuck with the fact that skipped stages (which come from a shared narrow dependency) do not share the same {{Stage}} object, even though they are conceptually the same stage to a user.  So retries from skipped stages also suggests that our goal should be for each attempt to have a cleared value for the accumulators, since that is the behavior we're stuck with on retries via a skipped stage in any case.  We could either always reset internal accumulators, or have them be a property of the stage **attempt** which just gets intiailized w/ the attempt and never reset.

Another option would be for the UI to just display the *update* from each task, rather than the accumulator value at the end of the task

https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372

That would make the global value of the accumulator entirely irrelevant.  In fact, I'm not certain how to interpret the sum of the memory used in each task.  If I have 10K tasks, running in 20 slots, the sum across all 10K tasks is probably over-estimating the memory used by 500x.  Its even stranger to report the quartiles of that partial sum as tasks complete.  I highly doubt most users will understand what that summary metric means, and even if they did understand, it seems to have very little value.

(Only using the *update* from each task would also mean that we wouldn't be using the accumulators to "accumulate" anything, it just becomes the place we happen to cram our per-task metrics.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org