You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Daniel Darabos (JIRA)" <ji...@apache.org> on 2015/07/02 15:39:04 UTC

[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

    [ https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611960#comment-14611960 ] 

Daniel Darabos commented on SPARK-5945:
---------------------------------------

At the moment we have a ton of these infinite retries. A stage is retried a few dozen times, then its parent goes missing and Spark starts retrying the parent until it also goes missing... We are still debugging the cause of our fetch failures, but I just wanted to mention that if there were a {{spark.stage.maxFailures}} option, we would be setting it to 1 at this point.

Thanks for all the work on this bug. Even if it's not fixed yet, it's very informative.

> Spark should not retry a stage infinitely on a FetchFailedException
> -------------------------------------------------------------------
>
>                 Key: SPARK-5945
>                 URL: https://issues.apache.org/jira/browse/SPARK-5945
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>            Assignee: Ilya Ganelin
>
> While investigating SPARK-5928, I noticed some very strange behavior in the way spark retries stages after a FetchFailedException.  It seems that on a FetchFailedException, instead of simply killing the task and retrying, Spark aborts the stage and retries.  If it just retried the task, the task might fail 4 times and then trigger the usual job killing mechanism.  But by killing the stage instead, the max retry logic is skipped (it looks to me like there is no limit for retries on a stage).
> After a bit of discussion with Kay Ousterhout, it seems the idea is that if a fetch fails, we assume that the block manager we are fetching from has failed, and that it will succeed if we retry the stage w/out that block manager.  In that case, it wouldn't make any sense to retry the task, since its doomed to fail every time, so we might as well kill the whole stage.  But this raises two questions:
> 1) Is it really safe to assume that a FetchFailedException means that the BlockManager has failed, and ti will work if we just try another one?  SPARK-5928 shows that there are at least some cases where that assumption is wrong.  Even if we fix that case, this logic seems brittle to the next case we find.  I guess the idea is that this behavior is what gives us the "R" in RDD ... but it seems like its not really that robust and maybe should be reconsidered.
> 2) Should stages only be retried a limited number of times?  It would be pretty easy to put in a limited number of retries per stage.  Though again, we encounter issues with keeping things resilient.  Theoretically one stage could have many retries, but due to failures in different stages further downstream, so we might need to track the cause of each retry as well to still have the desired behavior.
> In general it just seems there is some flakiness in the retry logic.  This is the only reproducible example I have at the moment, but I vaguely recall hitting other cases of strange behavior w/ retries when trying to run long pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch fails, but the executor eventually comes back and the stage gets retried again, but the same GC issues happen the second time around, etc.
> Copied from SPARK-5928, here's the example program that can regularly produce a loop of stage failures.  Note that it will only fail from a remote fetch, so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org