You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Patrick Wendell (JIRA)" <ji...@apache.org> on 2014/12/27 08:31:25 UTC
[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map
outputs
[ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Wendell updated SPARK-2638:
-----------------------------------
Fix Version/s: (was: 1.2.0)
1.3.0
> Improve concurrency of fetching Map outputs
> -------------------------------------------
>
> Key: SPARK-2638
> URL: https://issues.apache.org/jira/browse/SPARK-2638
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 1.0.0
> Environment: All
> Reporter: Stephen Boesch
> Assignee: Josh Rosen
> Priority: Minor
> Labels: MapOutput, concurrency
> Fix For: 1.3.0
>
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring.
> The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility).
> def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
> val statuses = mapStatuses.get(shuffleId).orNull
> if (statuses == null) {
> logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
> var fetchedStatuses: Array[MapStatus] = null
> fetching.synchronized { // This is existing code
> // shuffleId.toString.intern.synchronized { // New Code
> if (fetching.contains(shuffleId)) {
> // Someone else is fetching it; wait for them to be done
> while (fetching.contains(shuffleId)) {
> try {
> fetching.wait()
> } catch {
> case e: InterruptedException =>
> }
> }
> This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial.
> For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at
> https://github.com/javadba/scalatesting.git . Simply run "sbt test". Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org