You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by lirui-intel <gi...@git.apache.org> on 2014/07/08 13:51:53 UTC

[GitHub] spark pull request: SPARK-2387: remove stage barrier

GitHub user lirui-intel opened a pull request:

    https://github.com/apache/spark/pull/1328

    SPARK-2387: remove stage barrier

    This PR is a PoC implementation of [SPARK-2387](https://issues.apache.org/jira/browse/SPARK-2387).
    
    When a ShuffleMapTask finishes, DAGScheduler will check resource usage. And if there’re free slots, DAGScheduler chooses a stage from the waiting list whose parent stages have all started, and pre-starts this waiting stage. All the in-progress parent stages will then register the map outputs progressively with MapOutputTrackerMaster. A flag is added to MapOutputTracker to indicate whether the map statuses for a shuffle is partial or not, so that we can distinguish partial registration from failed shuffle map stage.
    When the downstream task tries to fetch shuffle blocks, it will get an array of map outputs that has “holes” (unfinished map tasks) in it. We created PartialBlockFetcherIterator to handle this map output array. PartialBlockFetcherIterator keeps an array of conventional iterators (BasicBlockFetcherIterator or NettyBlockFetcherIterator). When some new map outputs become available, PartialBlockFetcherIterator will delegate these outputs to a new conventional iterator and relies on these conventional iterators for “hasNext” and “next” methods. When all the delegated map statuses run out, PartialBlockFetcherIterator contacts local MapOutputTrackerWorker for updated map outputs. MapOutputTrackerWorker uses an "updater" thread to communicate with MapOutputTrackerMaster to update the map statuses and informs the downstream tasks to continue when the map statuses get updated.
    
    This PoC feature is mainly intended and tested against standalone cluster. I used a 7-node cluster for performance test. Each node runs an executor with 32 CPUs and 90GB memory. I used graphx.SynthBenchmark for the test and the testcase used is:
    graphx.SynthBenchmark -partStrategy=EdgePartition2D -numEPart=112 -nverts=10000000 -niter=3
    The feature improves the whole job by roughly 10% (reduces the creation time from 128s to 116s and run time from 126s to 115s).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lirui-intel/spark removeStageBarrier

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1328
    
----
commit 163302d26af6ab4b780e7047c417d6199f6c3020
Author: lirui <ru...@intel.com>
Date:   2014-05-05T05:21:05Z

    minor fix
    
    Signed-off-by: lirui <ru...@intel.com>

commit f81476d0460afabdb5f6a83c6542f080be81a58e
Author: lirui <ru...@intel.com>
Date:   2014-05-07T03:24:00Z

    Merge branch 'master' of https://github.com/lirui-intel/spark

commit 3124380ccfd19a38878e05f0af29f80c279a897b
Author: lirui <ru...@intel.com>
Date:   2014-05-08T07:06:57Z

    try to locate the point to remove the barrier

commit 8e625c0c87a4f5f3d35433f2eb1aca5d1cf09549
Author: lirui <ru...@intel.com>
Date:   2014-05-08T07:57:52Z

    apply upstream hot fix

commit 1d5d0f0be263a98387ba763b2298a8eccf3e2c65
Author: lirui <ru...@intel.com>
Date:   2014-05-09T13:33:36Z

    RemoveStageBarrier: support partial map outputs

commit c4f405446739280a3e02e70854d88f08498d8447
Author: lirui <ru...@intel.com>
Date:   2014-05-11T06:50:40Z

    RemoveStageBarrier: build fix

commit 444d2d96de39edc5399276fa2efec57638f10462
Author: lirui <ru...@intel.com>
Date:   2014-05-11T13:54:54Z

    RemoveStageBarrier: register map outputs progressively

commit 2df1d4e5cab7ba36443425a6f7b54a6ed06f519f
Author: lirui <ru...@intel.com>
Date:   2014-05-12T02:30:39Z

    RemoveStageBarrier: increment epoch for progressive registration

commit 9f18dc74a25d34dfda923e3f1064cc339110e9a9
Author: lirui <ru...@intel.com>
Date:   2014-05-12T08:39:10Z

    RemoveStageBarrier: fix check free CPUs

commit 7af23c0fa134ac329d2ee4fa8813e1deb13b1ddd
Author: lirui <ru...@intel.com>
Date:   2014-05-13T08:30:41Z

    RemoveStageBarrier: make reducers refresh map outputs less often

commit 9a32a17d0f2620ef807aa3d1ed26df39038bf3af
Author: lirui <ru...@intel.com>
Date:   2014-05-13T13:00:20Z

    RemoveStageBarrier: start reducers earlier

commit 9ffb208e0c32990f7919589c091124d41de51f7e
Author: lirui <ru...@intel.com>
Date:   2014-05-14T03:21:59Z

    RemoveStageBarrier: add log info

commit ef3b04323bcdeadab58d02b9076a14b08e130436
Author: lirui <ru...@intel.com>
Date:   2014-05-14T08:45:03Z

    RemoveStageBarrier: adjust sleep interval

commit 4213d63be930a7f39fca5667855b2862e52d3487
Author: lirui <ru...@intel.com>
Date:   2014-05-15T11:24:25Z

    RemoveStageBarrier: add a new iterator to manage partial map outputs

commit 376230a20822e797558530e67ee9df3619476d78
Author: lirui <ru...@intel.com>
Date:   2014-05-16T05:13:08Z

    RemoveStageBarrier: minor fixes

commit efd31efb006ddb3275c42375a069efcfa8d72244
Author: lirui <ru...@intel.com>
Date:   2014-05-16T06:20:16Z

    RemoveStageBarrier: fix: reducers may fail due to very slow mappers

commit 3cb944c9cdbc48ba9a152a9f425d3b09ab810390
Author: lirui <ru...@intel.com>
Date:   2014-05-16T08:29:09Z

    RemoveStageBarrier: add some log info

commit 641715e4a27e1b303a0cb13dc16e5544a8710cc7
Author: lirui <ru...@intel.com>
Date:   2014-05-19T01:28:32Z

    RemoveStageBarrier: stage with a bigger ID should take precedence

commit b0c2df24909d5ecb92f6d926f4705dedaf96660f
Author: lirui <ru...@intel.com>
Date:   2014-05-23T02:35:58Z

    RemoveStageBarrier: track whether map output for a shuffle is partial in MapOutputTracker

commit 75d27449e3b65382e5968842f5fa73c5f939c663
Author: lirui <ru...@intel.com>
Date:   2014-05-23T07:27:02Z

    RemoveStageBarrier: refine how we get the stage to pre-start

commit b7f1f844522d2c502c2e344f87ea2c729d53b4bc
Author: lirui <ru...@intel.com>
Date:   2014-05-23T07:47:03Z

    RemoveStageBarrier: indicate the output is partial for progressive registration

commit be474084f7a422873114cdd291c8a07da3f5ad2d
Author: lirui <ru...@intel.com>
Date:   2014-05-26T05:03:34Z

    add some debug info

commit c88014b0ebcf261554bcbac537e60130b9276da5
Author: lirui <ru...@intel.com>
Date:   2014-05-26T06:37:04Z

    add a new locality level for tasks with no preferred locations

commit 133a356d04e00a87a84e576b464b803d24a270ed
Author: lirui <ru...@intel.com>
Date:   2014-05-26T12:34:16Z

    re-compute pending list when new executor is added

commit 7d92f9a3d1a925701170c8ac9e459903073c09b8
Author: lirui <ru...@intel.com>
Date:   2014-05-26T14:10:33Z

    pendingTasksWithNoPrefs should only contain tasks that really have no preferred locations

commit c1de426c159c9a0670bfb30bae81d772195290d8
Author: lirui <ru...@intel.com>
Date:   2014-05-27T02:16:45Z

    make the delay schedule configurable

commit e57e081e847495b11289b48aeb894cd634ecca71
Author: lirui <ru...@intel.com>
Date:   2014-05-27T03:39:29Z

    clean up

commit fda0281490b52adf7cf20da20ffb36f5c34f7e7c
Author: lirui <ru...@intel.com>
Date:   2014-05-27T05:54:13Z

    do some refactor

commit 781861dc432ab6fad134691050be4d71f8bad1eb
Author: lirui <ru...@intel.com>
Date:   2014-05-28T08:31:33Z

    RemoveStageBarrier: fix problem with consolidated shuffle file

commit 679813b180a68074e4dbadd16fc9a36fdaac9f37
Author: lirui <ru...@intel.com>
Date:   2014-05-29T03:39:34Z

    RemoveStageBarrier: should fail the pre-started stages if the parent stage gets re-submitted

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1328#issuecomment-48326594
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on the pull request:

    https://github.com/apache/spark/pull/1328#issuecomment-48688983
  
    I think there might be several issues need to be addressed here to make this more robust and sound solution: 
    
    1. A solution to avoid pre-start stage occupy too many CPU resource which starve the parent stage and prevent it from finish in time. will need to control how many pre-start task can launch and adjust according to parent stage's status. And need some config to allow user to fine tune the behavior, after all, what's the best time to pre-start might depends on actual case.
    
    2. might need a more common Interface, seems a lot of specific class implementation is assumed.
    
    3. Might need to take more care of the memory management issue, say how to prevent pre-start stage occupy too many memory (say by cache etc) which complicate the overall cache eviction and GC problem etc.
    
    Just my suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1328#discussion_r15268635
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
      */
     private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
       protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
    +    with mutable.SynchronizedMap[Int, Array[MapStatus]]
    --- End diff --
    
    > Maybe it's better to make it ConcurrentHashMap in the base class.
    
    Because MapOutputTrackerMaster uses TimeStampedHashMap which is not a ConcurrentHashMap, MapOutputTracker still needs to use Map. Nevertheless, I can add a comment on MapOutputTracker.mapStatuses to mark that it should be a thread-safe map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/1328#issuecomment-48366398
  
    SPARK-2099 is adding a general executor->driver heartbeat.  It might be worth piggybacking the communication between the MapOutputTrackerWorker and MapOutputTrackerMaster on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by lirui-intel <gi...@git.apache.org>.
Github user lirui-intel commented on the pull request:

    https://github.com/apache/spark/pull/1328#issuecomment-48421136
  
    Thanks @sryza for the idea. I think it's OK to piggy back the communication in a heartbeat, but we should also allow the worker to explicitly ask the master for map statuses when a task demands more outputs. I'll look into it once you have that feature merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/1328#issuecomment-56289215
  
    I'd like to close this issue for now pending more of a design discussion on the JIRA. These Proof of Concept patches are useful to have, but I'd rather not have them lingering for a long time in the PR queue.
    
    I will post a link on the JIRA to this diff so we have it as a reference:
    https://github.com/lirui-intel/spark/compare/removeStageBarrier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1328#discussion_r15220164
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
      */
     private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
       protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
    +    with mutable.SynchronizedMap[Int, Array[MapStatus]]
    --- End diff --
    
    I think `ConcurrentHashMap` is better in most cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1328#discussion_r15266906
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
      */
     private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
       protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
    +    with mutable.SynchronizedMap[Int, Array[MapStatus]]
    --- End diff --
    
    Will this PR be merged soon? If not, I hope this line can be merged soon because it solves a critical concurrent issue of `mapStatuses`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by lirui-intel <gi...@git.apache.org>.
Github user lirui-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1328#discussion_r15267187
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
      */
     private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
       protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
    +    with mutable.SynchronizedMap[Int, Array[MapStatus]]
    --- End diff --
    
    @zsxwing thanks for the comments. Maybe it's better to make it ConcurrentHashMap in the base class.
    I don't think this PR can be merged soon... So maybe you can open another JIRA to fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2387: remove stage barrier

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1328


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org