You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jin xing (JIRA)" <ji...@apache.org> on 2017/02/07 10:59:41 UTC

[jira] [Updated] (SPARK-19263) DAGScheduler should avoid sending conflicting task set.

     [ https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jin xing updated SPARK-19263:
-----------------------------
    Description: 
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed* happens. Think about below:

1. Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the task's index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}

  was:
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed* happens. Think about below:
1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2;
2. ShuffleMapTask1 want's to fetch blocks from local but failed;
3. Driver receives the *FetchFailed* caused by ShuffleMapTask1 on executorA and marks executorA as lost and updates *failedEpoch*;
4. Driver resubmits stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
5. ShuffleMapTask2 is successfully finished on executorA and sends *Success* back to driver;
6. Driver receives *Success* and do *stage.pendingPartitions -= task.partitionId*, but then driver finds task's epoch is not big enough *<= failedEpoch(execId)* and just takes it as bogus, does not add the *MapStatus* to stage;
7. ShuffleMapTask1x is successfully finished on executorB;
8. Driver receives *Success* from ShuffleMapTask1x on executorB and does *stage.pendingPartitions -= task.partitionId*, thus no pending partitions, but then finds not all partitions are available because of step 6;
9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still running; in *TaskSchedulerImpl submitTasks*, it finds *conflictingTaskSet*, then throw *IllegalStateException*
10. Failed.

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the task's index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}


> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
>                 Key: SPARK-19263
>                 URL: https://issues.apache.org/jira/browse/SPARK-19263
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed* happens. Think about below:
> 1. Stage 0 runs and generates shuffle output data.
> 2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
> 3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
> 4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
> 5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
> 6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
> 7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the task's index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
>     val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2)
>     rdd.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.map {
>       keyAndValue => {
>         (keyAndValue._1 % 2, keyAndValue._2)
>       }
>     }.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org