You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kay Ousterhout (JIRA)" <ji...@apache.org> on 2016/04/25 19:41:13 UTC

[jira] [Comment Edited] (SPARK-13902) Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first.

    [ https://issues.apache.org/jira/browse/SPARK-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256669#comment-15256669 ] 

Kay Ousterhout edited comment on SPARK-13902 at 4/25/16 5:41 PM:
-----------------------------------------------------------------

After looking at this a bit more, I think I understand the issue better.  Is this a correct description of the issue?

Suppose you have the following DAG: 

{noformat}
                            
[A] <---- [B] <---- [C]
            \                    /
              <-----------
{noformat}

Here,  RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A.  The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data from B shuffle dependency ID s_B.

The getAncestorShuffleDependencies method in DAGScheduler (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when getAncestorShuffleDependencies gets called on C (the final RDD), getAncestorShuffleDependencies will return s_B, s_A, s_A (s_A gets added twice: once when the method "visit"s RDD C, and once when the method "visit"s RDD B).  This is problematic because this line of code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by getAncestorShuffleDependencies, resulting in duplicate map stages that compute the map output from RDD A.


was (Author: kayousterhout):
After looking at this a bit more, I think I understand the issue better.  Is this a correct description of the issue?

Suppose you have the following DAG: 

{noformat}
                            
[A] <---- [B] <---- [C]
            \                    /
              <-----------------
{noformat}

Here,  RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A.  The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data from B shuffle dependency ID s_B.

The getAncestorShuffleDependencies method in DAGScheduler (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when getAncestorShuffleDependencies gets called on C (the final RDD), getAncestorShuffleDependencies will return s_B, s_A, s_A (s_A gets added twice: once when the method "visit"s RDD C, and once when the method "visit"s RDD B).  This is problematic because this line of code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by getAncestorShuffleDependencies, resulting in duplicate map stages that compute the map output from RDD A.

> Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first.
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13902
>                 URL: https://issues.apache.org/jira/browse/SPARK-13902
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>            Reporter: Takuya Ueshin
>
> {{DAGScheduler}} sometimes generate incorrect stage graph.
> Some stages are generated for the same shuffleId twice or more and they are referenced by the child stages because the building order of the graph is not correct.
> Here, we submit an RDD\[F\] having a linage of RDDs as follows (please see this in {{monospaced}} font):
> {noformat}
>                               <--------------------
>                             /                       \
> [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F]
>                \                       /
>                  <--------------------
> {noformat}
> {{DAGScheduler}} generates the following stages and their parents for each shuffle id:
> | shuffle id | stage | parents |
> | 0 | ShuffleMapStage 2 | List() |
> | 1 | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
> | 2 | ShuffleMapStage 3 | List(ShuffleMapStage 1) |
> | 3 | ShuffleMapStage 4 | List(ShuffleMapStage 2, ShuffleMapStage 3) |
> | 4 | ShuffleMapStage 5 | List(ShuffleMapStage 1, ShuffleMapStage 4) |
> | \- | ResultStage 6 | List(ShuffleMapStage 5) |
> The stage for shuffle id {{0}} should be {{ShuffleMapStage 0}}, but the stage for shuffle id {{0}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} keeps referring the _old_ stage {{ShuffleMapStage 0}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org