You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2017/01/18 18:25:26 UTC

[jira] [Updated] (SPARK-19276) FetchFailures can be hidden be user (or sql) exception handling

     [ https://issues.apache.org/jira/browse/SPARK-19276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Imran Rashid updated SPARK-19276:
---------------------------------
    Description: 
The scheduler handles node failures by looking for a special {{FetchFailedException}} thrown by the shuffle block fetcher.  This is handled in {{Executor}} and then passed as a special msg back to the driver: https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403

However, user code exists in between the shuffle block fetcher and that catch block -- it could intercept the exception, wrap it with something else, and throw a different exception.  If that happens, spark treats it as an ordinary task failure, and retries the task, rather than regenerating the missing shuffle data.  The task eventually is retried 4 times, its doomed to fail each time, and the job is failed.

You might think that no user code should do that -- but even sparksql does it:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214

Here's an example stack trace.  This is from Spark 1.6, so the sql code is not the same, but the problem is still there:

{noformat}
17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to xxx/yyy:zzz
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
...
17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 failed 4 times; aborting job
{noformat}

I think the right fix here is to also set a fetch failure status in the {{TaskContextImpl}}, so the executor can check that instead of just one exception.

  was:
The scheduler handles node failures by looking for a special {{FetchFailedException}} thrown by the shuffle block fetcher.  This is handled in {{Executor}} and then passed as a special msg back to the driver: https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403

However, user code exists in between the shuffle block fetcher and that catch block -- it could intercept the exception, wrap it with something else, and throw a different exception.  If that happens, spark treats it as an ordinary task failure, and retries the task, rather than regenerating the missing shuffle data.  The task eventually is retried 4 times, its doomed to fail each time, and the job is failed.

You might think that no user code should do that -- but even sparksql does it:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214

I think the right fix here is to also set a fetch failure status in the {{TaskContextImpl}}, so the executor can check that instead of just one exception.


> FetchFailures can be hidden be user (or sql) exception handling
> ---------------------------------------------------------------
>
>                 Key: SPARK-19276
>                 URL: https://issues.apache.org/jira/browse/SPARK-19276
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core, SQL
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>            Priority: Critical
>
> The scheduler handles node failures by looking for a special {{FetchFailedException}} thrown by the shuffle block fetcher.  This is handled in {{Executor}} and then passed as a special msg back to the driver: https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403
> However, user code exists in between the shuffle block fetcher and that catch block -- it could intercept the exception, wrap it with something else, and throw a different exception.  If that happens, spark treats it as an ordinary task failure, and retries the task, rather than regenerating the missing shuffle data.  The task eventually is retried 4 times, its doomed to fail each time, and the job is failed.
> You might think that no user code should do that -- but even sparksql does it:
> https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214
> Here's an example stack trace.  This is from Spark 1.6, so the sql code is not the same, but the problem is still there:
> {noformat}
> 17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while writing rows.
>         at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
>         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to xxx/yyy:zzz
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
> ...
> 17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 failed 4 times; aborting job
> {noformat}
> I think the right fix here is to also set a fetch failure status in the {{TaskContextImpl}}, so the executor can check that instead of just one exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org