You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by staple <gi...@git.apache.org> on 2014/07/10 22:37:16 UTC

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

GitHub user staple opened a pull request:

    https://github.com/apache/spark/pull/1362

    [SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited partitions.

    What do you think of a method like this for avoiding exponential path exploration in DAGScheduler's getPreferredLocs, per SPARK-695?
    
    Some minor cleanups are also included.  The comment I removed in Dependency.scala seemed incorrect to me, per my commit comment:
    
        Remove apparently incorrect comment describing NarrowDependency.
      
        Each of a CartesianRDD's dependencies, for example, is a NarrowDependency, but child partitions of
        these dependencies may depend on shared parent partitions.  For example, the cartesian product of
        RDDs a and b containing partitions [ a_0 ] and [ b_0, b_1 ] respectively will be partitioned as
        [ ( a_0, b_0 ), ( a_0, b_1 ) ].  Each child partition depends on parent partition a_0.
    
    I'm new to spark, scala (and even java) so a careful review may be in order.
    
    Thanks,
    Aaron

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/staple/spark SPARK-695

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1362
    
----
commit 16d08271b30be6d33866f4b0411623e53e1149aa
Author: Aaron Staple <aa...@gmail.com>
Date:   2014-07-10T06:07:37Z

    [SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited partitions.

commit 2f30ba15994b02713a1d52ada954dd596cf6732a
Author: Aaron Staple <aa...@gmail.com>
Date:   2014-07-10T06:07:37Z

    Remove apparently incorrect comment describing NarrowDependency.
    
    Each of a CartesianRDD's dependencies, for example, is a NarrowDependency, but child partitions of
    these dependencies may depend on shared parent partitions.  For example, the cartesian product of
    RDDs a and b containing partitions [ a_0 ] and [ b_0, b_1 ] respectively will be partitioned as
    [ ( a_0, b_0 ), ( a_0, b_1 ) ].  Each child partition depends on parent partition a_0.

commit 60f348fe51344dbff6f7d26d8b8bb6db8bf29596
Author: Aaron Staple <aa...@gmail.com>
Date:   2014-07-10T06:07:37Z

    Clarify comment.

commit 6da1786e29a250ee2e98e2d34e0306693a75b8bc
Author: Aaron Staple <aa...@gmail.com>
Date:   2014-07-10T06:07:37Z

    Remove unused variable.

commit ada3026b100824218f7c3ff332738411ef6387ff
Author: Aaron Staple <aa...@gmail.com>
Date:   2014-07-10T06:07:37Z

    Fix indentation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15040684
  
    --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
    @@ -32,8 +32,6 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
     
     /**
      * :: DeveloperApi ::
    - * Base class for dependencies where each partition of the parent RDD is used by at most one
    - * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
    --- End diff --
    
    Yeah I think a substitute comment is preferable to removal, thanks for providing the proper definition.  Per your suggestion I’ll change it to:
    
    Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD.  Narrow dependencies allow for pipelined execution.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-49457211
  
    I added some patches to address the above comments and introduce a timed test.
    
    My test uses an RDD with no preferred locations in the entire dependency graph.  The reason for this is that some RDD types employ mitigations for the exponential blowup problem by overriding getPreferredLocations to return the preferred locations of their parents.  The easiest way to circumvent these mitigations and implement a failing test was to create a dependency graph with no preferred locations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15018046
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    I'm curious, why did you remove this? It seems unrelated to the preferred locs fix, and it was necessary to prevent exponential explosion here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15018253
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1128,6 +1126,23 @@ class DAGScheduler(
        */
       private[spark]
       def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
    +    getPreferredLocsInternal(rdd, partition, new HashMap)
    +  }
    +
    +  /** Recursive implementation for getPreferredLocs. */
    +  private
    +  def getPreferredLocsInternal(
    +      rdd: RDD[_],
    +      partition: Int,
    +      visited: HashMap[RDD[_], HashSet[Int]])
    --- End diff --
    
    It might be clearer to make this a `HashSet[(RDD[_], Int)]`. Otherwise it's not clear that this is just a way to remember which pairs of (RDD, partitionID) we've checked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50915593
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15040842
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    `visitStages` is built up and used repeatedly in the recursive calls to `visit` https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1101


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15018547
  
    --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
    @@ -32,8 +32,6 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
     
     /**
      * :: DeveloperApi ::
    - * Base class for dependencies where each partition of the parent RDD is used by at most one
    - * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
    --- End diff --
    
    It's true that this doesn't cover CartesianRDD, but at the same time I think we shouldn't remove this comment. Maybe change it to "where each partition of the child RDD depends on a small number of partitions of the parent RDD". That will also cover Cartesian. And leave the part about pipelining.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15040670
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    Maybe this is an opportunity for me to get some scala education.  I can see that, in this stageDependsOn function, the visitedRdds val is used to prevent re-visiting an RDD, but I only see two uses of _visitedStages_.  visitedStages is constructed, and then stages are added to it.  I’m not seeing how visitedStages is being used.  Is there some sort of callback when a stage is added to this HashSet?  I’m an admitted scala novice, so please let me know if there’s some magic going on or I’m being dense here :)
    
    Because I didn’t see how visitedStages was being used, I removed this variable in my “Remove unused variable.” commit.  I’ll go ahead and withdraw that commit for now.  If the variable is in fact in use, I’d love to learn understand that better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50897748
  
    No problem, I made the changes you suggested. Should be ready for jenkins now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15076386
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    @mateiz It looks to me that this is what was always intended, but has actually been missing the `visitedStages` check for the past couple of years:
    ```scala
    if (!mapStage.isAvailable && !visitedStages(mapStage)) {
      visitedStages += mapStage
      visit(mapStage.rdd)
    }  // Otherwise there's no need to follow the dependency back
    ```
    Making that change works for me in the sense that all of the test suites continue to pass, but I haven't yet got any relative performance numbers.
    
    Proabably needs to go in a separate JIRA & PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50981523
  
    Great, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15684804
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -291,6 +293,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         assertDataStructuresEmpty
       }
     
    +  test("avoid exponential blowup when getting preferred locs list") {
    +    // Build up a complex dependency graph with repeated zip operations, without preferred locations.
    +    var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
    +    (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
    +    // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
    +    failAfter(5 seconds) {
    --- End diff --
    
    Actually another small thing you might fix is increasing this to 10 seconds, since you sometimes have garbage collection in tests that takes longer. I'm assuming it would take minutes with the old code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-49257384
  
    Thanks for the review!  I'll address your comments and look into writing a performance unit test, per your suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50856749
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50921841
  
    QA results for PR 1362:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17685/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15041028
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    Thanks Mark.  I can see that stages will be repeatedly added to visitedStages.  What I'm not seeing is the consequence of that - how does adding stages to visitedStages affect the functionality here if we never, for example, check whether a stage belongs to visitedStages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15701803
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1131,6 +1131,23 @@ class DAGScheduler(
        */
       private[spark]
       def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
    +    getPreferredLocsInternal(rdd, partition, new HashSet)
    +  }
    +
    +  /** Recursive implementation for getPreferredLocs. */
    +  private
    +  def getPreferredLocsInternal(
    --- End diff --
    
    Sure, fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15701858
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -291,6 +293,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         assertDataStructuresEmpty
       }
     
    +  test("avoid exponential blowup when getting preferred locs list") {
    +    // Build up a complex dependency graph with repeated zip operations, without preferred locations.
    +    var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
    +    (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
    +    // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
    +    failAfter(5 seconds) {
    --- End diff --
    
    Sure, changed to 10 seconds. The old DAGScheduler took about 4 minutes on my system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50831557
  
    I fixed the recent merge conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-48661136
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50856865
  
    Sorry for taking a bit of time to get to this, but it looks good. I'll merge it if the tests pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15684786
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1131,6 +1131,23 @@ class DAGScheduler(
        */
       private[spark]
       def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
    +    getPreferredLocsInternal(rdd, partition, new HashSet)
    +  }
    +
    +  /** Recursive implementation for getPreferredLocs. */
    +  private
    +  def getPreferredLocsInternal(
    --- End diff --
    
    Small code style issue: "private" and "def" should be on the same line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50915843
  
    QA tests have started for PR 1362. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17685/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15122197
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    Hi, I filed SPARK-2581 for follow-up work here.
    https://issues.apache.org/jira/browse/SPARK-2581


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15040672
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1128,6 +1126,23 @@ class DAGScheduler(
        */
       private[spark]
       def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
    +    getPreferredLocsInternal(rdd, partition, new HashMap)
    +  }
    +
    +  /** Recursive implementation for getPreferredLocs. */
    +  private
    +  def getPreferredLocsInternal(
    +      rdd: RDD[_],
    +      partition: Int,
    +      visited: HashMap[RDD[_], HashSet[Int]])
    --- End diff --
    
    Sure, will do.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15076891
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    Hmm, yeah, it seems that should be there. On the other hand, will tracking the visited RDDs be enough? I guess we need to look into it, but it doesn't seem urgent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-50922808
  
    Thanks Aaron. I've merged this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15076921
  
    --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
    @@ -32,8 +32,6 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
     
     /**
      * :: DeveloperApi ::
    - * Base class for dependencies where each partition of the parent RDD is used by at most one
    - * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
    --- End diff --
    
    Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1362#discussion_r15041791
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1107,7 +1106,6 @@ class DAGScheduler(
                 case shufDep: ShuffleDependency[_, _, _] =>
                   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                   if (!mapStage.isAvailable) {
    -                visitedStages += mapStage
    --- End diff --
    
    Yup, it's odd -- and it looks to me like it has been that way since this section of code was introduced.  Looks like there should be an `if (!visitedStages(mapStage))` guarding `visit(mapStage.rdd)`... testing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1362#issuecomment-49208638
  
    This looks good to me modulo a few comments! One other thing though, you should add a unit test for this functionality. Create a test that would result in a very long running time with the old code (e.g. something where you zip RDDs together 20-30 times and the original one has some preferred locations), and add something around it to time that it runs within, say, 5 seconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---