You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by lirui-intel <gi...@git.apache.org> on 2014/07/08 13:51:53 UTC
[GitHub] spark pull request: SPARK-2387: remove stage barrier
GitHub user lirui-intel opened a pull request:
https://github.com/apache/spark/pull/1328
SPARK-2387: remove stage barrier
This PR is a PoC implementation of [SPARK-2387](https://issues.apache.org/jira/browse/SPARK-2387).
When a ShuffleMapTask finishes, DAGScheduler will check resource usage. And if there’re free slots, DAGScheduler chooses a stage from the waiting list whose parent stages have all started, and pre-starts this waiting stage. All the in-progress parent stages will then register the map outputs progressively with MapOutputTrackerMaster. A flag is added to MapOutputTracker to indicate whether the map statuses for a shuffle is partial or not, so that we can distinguish partial registration from failed shuffle map stage.
When the downstream task tries to fetch shuffle blocks, it will get an array of map outputs that has “holes” (unfinished map tasks) in it. We created PartialBlockFetcherIterator to handle this map output array. PartialBlockFetcherIterator keeps an array of conventional iterators (BasicBlockFetcherIterator or NettyBlockFetcherIterator). When some new map outputs become available, PartialBlockFetcherIterator will delegate these outputs to a new conventional iterator and relies on these conventional iterators for “hasNext” and “next” methods. When all the delegated map statuses run out, PartialBlockFetcherIterator contacts local MapOutputTrackerWorker for updated map outputs. MapOutputTrackerWorker uses an "updater" thread to communicate with MapOutputTrackerMaster to update the map statuses and informs the downstream tasks to continue when the map statuses get updated.
This PoC feature is mainly intended and tested against standalone cluster. I used a 7-node cluster for performance test. Each node runs an executor with 32 CPUs and 90GB memory. I used graphx.SynthBenchmark for the test and the testcase used is:
graphx.SynthBenchmark -partStrategy=EdgePartition2D -numEPart=112 -nverts=10000000 -niter=3
The feature improves the whole job by roughly 10% (reduces the creation time from 128s to 116s and run time from 126s to 115s).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/lirui-intel/spark removeStageBarrier
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1328.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1328
----
commit 163302d26af6ab4b780e7047c417d6199f6c3020
Author: lirui <ru...@intel.com>
Date: 2014-05-05T05:21:05Z
minor fix
Signed-off-by: lirui <ru...@intel.com>
commit f81476d0460afabdb5f6a83c6542f080be81a58e
Author: lirui <ru...@intel.com>
Date: 2014-05-07T03:24:00Z
Merge branch 'master' of https://github.com/lirui-intel/spark
commit 3124380ccfd19a38878e05f0af29f80c279a897b
Author: lirui <ru...@intel.com>
Date: 2014-05-08T07:06:57Z
try to locate the point to remove the barrier
commit 8e625c0c87a4f5f3d35433f2eb1aca5d1cf09549
Author: lirui <ru...@intel.com>
Date: 2014-05-08T07:57:52Z
apply upstream hot fix
commit 1d5d0f0be263a98387ba763b2298a8eccf3e2c65
Author: lirui <ru...@intel.com>
Date: 2014-05-09T13:33:36Z
RemoveStageBarrier: support partial map outputs
commit c4f405446739280a3e02e70854d88f08498d8447
Author: lirui <ru...@intel.com>
Date: 2014-05-11T06:50:40Z
RemoveStageBarrier: build fix
commit 444d2d96de39edc5399276fa2efec57638f10462
Author: lirui <ru...@intel.com>
Date: 2014-05-11T13:54:54Z
RemoveStageBarrier: register map outputs progressively
commit 2df1d4e5cab7ba36443425a6f7b54a6ed06f519f
Author: lirui <ru...@intel.com>
Date: 2014-05-12T02:30:39Z
RemoveStageBarrier: increment epoch for progressive registration
commit 9f18dc74a25d34dfda923e3f1064cc339110e9a9
Author: lirui <ru...@intel.com>
Date: 2014-05-12T08:39:10Z
RemoveStageBarrier: fix check free CPUs
commit 7af23c0fa134ac329d2ee4fa8813e1deb13b1ddd
Author: lirui <ru...@intel.com>
Date: 2014-05-13T08:30:41Z
RemoveStageBarrier: make reducers refresh map outputs less often
commit 9a32a17d0f2620ef807aa3d1ed26df39038bf3af
Author: lirui <ru...@intel.com>
Date: 2014-05-13T13:00:20Z
RemoveStageBarrier: start reducers earlier
commit 9ffb208e0c32990f7919589c091124d41de51f7e
Author: lirui <ru...@intel.com>
Date: 2014-05-14T03:21:59Z
RemoveStageBarrier: add log info
commit ef3b04323bcdeadab58d02b9076a14b08e130436
Author: lirui <ru...@intel.com>
Date: 2014-05-14T08:45:03Z
RemoveStageBarrier: adjust sleep interval
commit 4213d63be930a7f39fca5667855b2862e52d3487
Author: lirui <ru...@intel.com>
Date: 2014-05-15T11:24:25Z
RemoveStageBarrier: add a new iterator to manage partial map outputs
commit 376230a20822e797558530e67ee9df3619476d78
Author: lirui <ru...@intel.com>
Date: 2014-05-16T05:13:08Z
RemoveStageBarrier: minor fixes
commit efd31efb006ddb3275c42375a069efcfa8d72244
Author: lirui <ru...@intel.com>
Date: 2014-05-16T06:20:16Z
RemoveStageBarrier: fix: reducers may fail due to very slow mappers
commit 3cb944c9cdbc48ba9a152a9f425d3b09ab810390
Author: lirui <ru...@intel.com>
Date: 2014-05-16T08:29:09Z
RemoveStageBarrier: add some log info
commit 641715e4a27e1b303a0cb13dc16e5544a8710cc7
Author: lirui <ru...@intel.com>
Date: 2014-05-19T01:28:32Z
RemoveStageBarrier: stage with a bigger ID should take precedence
commit b0c2df24909d5ecb92f6d926f4705dedaf96660f
Author: lirui <ru...@intel.com>
Date: 2014-05-23T02:35:58Z
RemoveStageBarrier: track whether map output for a shuffle is partial in MapOutputTracker
commit 75d27449e3b65382e5968842f5fa73c5f939c663
Author: lirui <ru...@intel.com>
Date: 2014-05-23T07:27:02Z
RemoveStageBarrier: refine how we get the stage to pre-start
commit b7f1f844522d2c502c2e344f87ea2c729d53b4bc
Author: lirui <ru...@intel.com>
Date: 2014-05-23T07:47:03Z
RemoveStageBarrier: indicate the output is partial for progressive registration
commit be474084f7a422873114cdd291c8a07da3f5ad2d
Author: lirui <ru...@intel.com>
Date: 2014-05-26T05:03:34Z
add some debug info
commit c88014b0ebcf261554bcbac537e60130b9276da5
Author: lirui <ru...@intel.com>
Date: 2014-05-26T06:37:04Z
add a new locality level for tasks with no preferred locations
commit 133a356d04e00a87a84e576b464b803d24a270ed
Author: lirui <ru...@intel.com>
Date: 2014-05-26T12:34:16Z
re-compute pending list when new executor is added
commit 7d92f9a3d1a925701170c8ac9e459903073c09b8
Author: lirui <ru...@intel.com>
Date: 2014-05-26T14:10:33Z
pendingTasksWithNoPrefs should only contain tasks that really have no preferred locations
commit c1de426c159c9a0670bfb30bae81d772195290d8
Author: lirui <ru...@intel.com>
Date: 2014-05-27T02:16:45Z
make the delay schedule configurable
commit e57e081e847495b11289b48aeb894cd634ecca71
Author: lirui <ru...@intel.com>
Date: 2014-05-27T03:39:29Z
clean up
commit fda0281490b52adf7cf20da20ffb36f5c34f7e7c
Author: lirui <ru...@intel.com>
Date: 2014-05-27T05:54:13Z
do some refactor
commit 781861dc432ab6fad134691050be4d71f8bad1eb
Author: lirui <ru...@intel.com>
Date: 2014-05-28T08:31:33Z
RemoveStageBarrier: fix problem with consolidated shuffle file
commit 679813b180a68074e4dbadd16fc9a36fdaac9f37
Author: lirui <ru...@intel.com>
Date: 2014-05-29T03:39:34Z
RemoveStageBarrier: should fail the pre-started stages if the parent stage gets re-submitted
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/1328#issuecomment-48326594
Can one of the admins verify this patch?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on the pull request:
https://github.com/apache/spark/pull/1328#issuecomment-48688983
I think there might be several issues need to be addressed here to make this more robust and sound solution:
1. A solution to avoid pre-start stage occupy too many CPU resource which starve the parent stage and prevent it from finish in time. will need to control how many pre-start task can launch and adjust according to parent stage's status. And need some config to allow user to fine tune the behavior, after all, what's the best time to pre-start might depends on actual case.
2. might need a more common Interface, seems a lot of specific class implementation is assumed.
3. Might need to take more care of the memory management issue, say how to prevent pre-start stage occupy too many memory (say by cache etc) which complicate the overall cache eviction and GC problem etc.
Just my suggestion.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/1328#discussion_r15268635
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ with mutable.SynchronizedMap[Int, Array[MapStatus]]
--- End diff --
> Maybe it's better to make it ConcurrentHashMap in the base class.
Because MapOutputTrackerMaster uses TimeStampedHashMap which is not a ConcurrentHashMap, MapOutputTracker still needs to use Map. Nevertheless, I can add a comment on MapOutputTracker.mapStatuses to mark that it should be a thread-safe map.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:
https://github.com/apache/spark/pull/1328#issuecomment-48366398
SPARK-2099 is adding a general executor->driver heartbeat. It might be worth piggybacking the communication between the MapOutputTrackerWorker and MapOutputTrackerMaster on this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by lirui-intel <gi...@git.apache.org>.
Github user lirui-intel commented on the pull request:
https://github.com/apache/spark/pull/1328#issuecomment-48421136
Thanks @sryza for the idea. I think it's OK to piggy back the communication in a heartbeat, but we should also allow the worker to explicitly ask the master for map statuses when a task demands more outputs. I'll look into it once you have that feature merged.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:
https://github.com/apache/spark/pull/1328#issuecomment-56289215
I'd like to close this issue for now pending more of a design discussion on the JIRA. These Proof of Concept patches are useful to have, but I'd rather not have them lingering for a long time in the PR queue.
I will post a link on the JIRA to this diff so we have it as a reference:
https://github.com/lirui-intel/spark/compare/removeStageBarrier
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/1328#discussion_r15220164
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ with mutable.SynchronizedMap[Int, Array[MapStatus]]
--- End diff --
I think `ConcurrentHashMap` is better in most cases.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/1328#discussion_r15266906
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ with mutable.SynchronizedMap[Int, Array[MapStatus]]
--- End diff --
Will this PR be merged soon? If not, I hope this line can be merged soon because it solves a critical concurrent issue of `mapStatuses`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by lirui-intel <gi...@git.apache.org>.
Github user lirui-intel commented on a diff in the pull request:
https://github.com/apache/spark/pull/1328#discussion_r15267187
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ with mutable.SynchronizedMap[Int, Array[MapStatus]]
--- End diff --
@zsxwing thanks for the comments. Maybe it's better to make it ConcurrentHashMap in the base class.
I don't think this PR can be merged soon... So maybe you can open another JIRA to fix this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-2387: remove stage barrier
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/1328
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org