You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kay Ousterhout (JIRA)" <ji...@apache.org> on 2016/04/25 19:41:13 UTC
[jira] [Comment Edited] (SPARK-13902) Make
DAGScheduler.getAncestorShuffleDependencies() return in topological order
to ensure building ancestor stages first.
[ https://issues.apache.org/jira/browse/SPARK-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256669#comment-15256669 ]
Kay Ousterhout edited comment on SPARK-13902 at 4/25/16 5:41 PM:
-----------------------------------------------------------------
After looking at this a bit more, I think I understand the issue better. Is this a correct description of the issue?
Suppose you have the following DAG:
{noformat}
[A] <---- [B] <---- [C]
\ /
<-----------
{noformat}
Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data from B shuffle dependency ID s_B.
The getAncestorShuffleDependencies method in DAGScheduler (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when getAncestorShuffleDependencies gets called on C (the final RDD), getAncestorShuffleDependencies will return s_B, s_A, s_A (s_A gets added twice: once when the method "visit"s RDD C, and once when the method "visit"s RDD B). This is problematic because this line of code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by getAncestorShuffleDependencies, resulting in duplicate map stages that compute the map output from RDD A.
was (Author: kayousterhout):
After looking at this a bit more, I think I understand the issue better. Is this a correct description of the issue?
Suppose you have the following DAG:
{noformat}
[A] <---- [B] <---- [C]
\ /
<-----------------
{noformat}
Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data from B shuffle dependency ID s_B.
The getAncestorShuffleDependencies method in DAGScheduler (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when getAncestorShuffleDependencies gets called on C (the final RDD), getAncestorShuffleDependencies will return s_B, s_A, s_A (s_A gets added twice: once when the method "visit"s RDD C, and once when the method "visit"s RDD B). This is problematic because this line of code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by getAncestorShuffleDependencies, resulting in duplicate map stages that compute the map output from RDD A.
> Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first.
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13902
> URL: https://issues.apache.org/jira/browse/SPARK-13902
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Reporter: Takuya Ueshin
>
> {{DAGScheduler}} sometimes generate incorrect stage graph.
> Some stages are generated for the same shuffleId twice or more and they are referenced by the child stages because the building order of the graph is not correct.
> Here, we submit an RDD\[F\] having a linage of RDDs as follows (please see this in {{monospaced}} font):
> {noformat}
> <--------------------
> / \
> [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F]
> \ /
> <--------------------
> {noformat}
> {{DAGScheduler}} generates the following stages and their parents for each shuffle id:
> | shuffle id | stage | parents |
> | 0 | ShuffleMapStage 2 | List() |
> | 1 | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
> | 2 | ShuffleMapStage 3 | List(ShuffleMapStage 1) |
> | 3 | ShuffleMapStage 4 | List(ShuffleMapStage 2, ShuffleMapStage 3) |
> | 4 | ShuffleMapStage 5 | List(ShuffleMapStage 1, ShuffleMapStage 4) |
> | \- | ResultStage 6 | List(ShuffleMapStage 5) |
> The stage for shuffle id {{0}} should be {{ShuffleMapStage 0}}, but the stage for shuffle id {{0}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} keeps referring the _old_ stage {{ShuffleMapStage 0}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org