You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Patrick Wendell (JIRA)" <ji...@apache.org> on 2014/12/27 08:31:25 UTC

[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs

     [ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Patrick Wendell updated SPARK-2638:
-----------------------------------
    Fix Version/s:     (was: 1.2.0)
                   1.3.0

> Improve concurrency of fetching Map outputs
> -------------------------------------------
>
>                 Key: SPARK-2638
>                 URL: https://issues.apache.org/jira/browse/SPARK-2638
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: All
>            Reporter: Stephen Boesch
>            Assignee: Josh Rosen
>            Priority: Minor
>              Labels: MapOutput, concurrency
>             Fix For: 1.3.0
>
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring.  
> The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility).
>   def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
>     val statuses = mapStatuses.get(shuffleId).orNull
>     if (statuses == null) {
>       logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
>       var fetchedStatuses: Array[MapStatus] = null
>       fetching.synchronized {   // This is existing code
>      //  shuffleId.toString.intern.synchronized {  // New Code
>         if (fetching.contains(shuffleId)) {
>           // Someone else is fetching it; wait for them to be done
>           while (fetching.contains(shuffleId)) {
>             try {
>               fetching.wait()
>             } catch {
>               case e: InterruptedException =>
>             }
>           }
> This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial.  
> For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at
> https://github.com/javadba/scalatesting.git  .  Simply run "sbt test". Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org