You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by javadba <gi...@git.apache.org> on 2014/07/23 06:38:34 UTC

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

GitHub user javadba opened a pull request:

    https://github.com/apache/spark/pull/1542

    SPARK-2638 MapOutputTracker concurrency improvement

    Spark-2638 Improve concurrency of fetching Map outputs
    
    This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring.
    The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility).
    
    For further details please refer to the JIRA.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/javadba/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1542
    
----
commit 5d1cb0a449bbf1ea95272a45f2d030d5cad0195c
Author: Stephen Boesch <ja...@gmail.com>
Date:   2014-07-23T04:33:25Z

    SPARK-2638 MapOutputTracker concurrency improvement

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1542#discussion_r15392537
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -162,9 +164,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
               logInfo("Got the output locations")
               mapStatuses.put(shuffleId, fetchedStatuses)
             } finally {
    -          fetching.synchronized {
    -            fetching -= shuffleId
    -            fetching.notifyAll()
    +          monitor.synchronized {
    +            fetching.remove(shuffleId)
    +            monitor.notifyAll()
    --- End diff --
    
    The read/write operation of `fetching` should use the same monitor according to the java memory model.
    
    > In addition, using the monitor of an object from outside to synchronized makes it hard to analyse the codes because the monitor of this object may be used (or used in the future) in other place and the developer may not notice somebody has already used it here.
    
    Also this problem looks hard to solve. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by javadba <gi...@git.apache.org>.
Github user javadba commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1542#discussion_r15389491
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -130,7 +130,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
         if (statuses == null) {
           logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
           var fetchedStatuses: Array[MapStatus] = null
    -      fetching.synchronized {
    +      shuffleId.toString.intern.synchronized {
    --- End diff --
    
    Yes I see your  point and agree there is a problem in the proposed fix..  My fix is incorrect in that we need to match the outer shuffleId.toString.intern.synchronized to the inner shuffleId.toString.intern.wait(). 
    
    Then the code should look like this:
    
    val monitor = shuffleId.toString.intern
    monitor.synchronized {
       ..
             monitor.wait()
    }
    
    Do you agree?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-51020347
  
    Since this same locking pattern occurs at several places in the code, I think it might make sense to abstract it behind a function or macro, which would give us a centralized place to experiment with different synchronization / locking strategies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50497514
  
    I think asfgit got confused somehow; the linked commit looks unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50196891
  
    As I [commented on the JIRA](https://issues.apache.org/jira/browse/SPARK-2638?focusedCommentId=14074710&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14074710), unless I've missed something I'm not sure that the original code exhibits significant concurrency issues: no time-consuming work is performed in the synchronized blocks and the actual MapOuputTracker requests proceed in parallel without holding any locks.  The only potential issue that I see is many concurrent duplicate requests causing a big queue of waiters on the lock that results in many spurious wakeups as statuses are fetched; this seems unlikely to happen in practice, though.
    
    Do you have an actual use-case that demonstrates that the current synchronization strategy is a performance issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1542#discussion_r15326142
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -130,7 +130,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
         if (statuses == null) {
           logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
           var fetchedStatuses: Array[MapStatus] = null
    -      fetching.synchronized {
    +      shuffleId.toString.intern.synchronized {
    --- End diff --
    
    In addition, using the monitor of an object from outside to `synchronized` makes it hard to analyse the codes because the monitor of this object may be used (or used in the future) in other place and the developer may not notice somebody has already used it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by javadba <gi...@git.apache.org>.
Github user javadba commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50814023
  
    HI Folks,
       Yes I was on master for this PR . I had realized this was not the correct procedure so had already started to create separate branches on my fork for the other PR's.  Apologies for the scare here with the "Merge" label.
    
    AFA this particular PR. I did work through the original (prior to my PR) logic with a coworker. yes it is actually correct. It is also complicated.  Now I am not attempting at this point to resurrect my PR. But I will still maintain that it is better code.
    
    a) Shorter, more concise and yes better performing - though as Josh points out correctly there is actually no long running code that is being locked in the original code : so the benefits in this case are insubstantial.
    
    b) AFA the - correct - protest about the use of interning numbers - which could be re-used in other parts of the code. I agree. But  a small fix takes care of it. Simply prepend the interned ShuffleId with the className:objectName and now it is unique.  e.g.  s"o.a.s.MapOutputTracker.shuffleID$shuffleId".  That would then not collide with potentially other usages of this tactic.
    
    I will actually look at the half dozen other cases that use the present Set checking tactic at some point. But we are all a bit tired of this topic right now - and 1.1.0 is in any case "a wrap".  So this particular PR can safely remain closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50488319
  
    @pwendell @mateiz was this PR really merged into spark ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by javadba <gi...@git.apache.org>.
Github user javadba commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50194492
  
    Hi again.  Upon closer inspection of the existing code/ functionality we do have an opportunity here to:
      (a) reduce the code size / complexity
      (b) at the same time improve the concurrency by a factor of the number of concurrent MapOutput fetch requests
    
    Here is an initial version of the rewrite of   getServerStatuses. Notice it has a single synchronized and reduces the SLOC from 53 to 32.    I am going to write a thorough unit test to demonstrate (a) correctness and (b) improved concurrency of this new version versus the existing
    
      def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
        val monitor = shuffleId.toString.intern
        var statuses: Array[MapStatus] = null
        monitor.synchronized {
          statuses = mapStatuses.get(shuffleId).orNull
          if (statuses == null) {
            logInfo(s"Fetching for shuffle $shuffleId ; tracker actor = $trackerActor")
            val fetchedBytes =
              askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
            fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
            if (fetchedStatuses != null) {
              logInfo(s"We got the output locations for shuffle $shuffleId ; tracker actor = $trackerActor")
              mapStatuses.put(shuffleId, fetchedStatuses)
              return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
            } else {
              throw new MetadataFetchFailedException(
                shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
            }
          } else {
            return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
          }
        }
      }
    
    btw the sync on statuses was moved to the private method : reduces code size and also avoids risk of someone not remembering to include synchronization:
    
      private def convertMapStatuses(
      ..
        statuses.synchronized {
        .,
        }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-49832419
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by javadba <gi...@git.apache.org>.
Github user javadba commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-51022369
  
    Thanks for commenting Josh. I will see about putting together something on this including solid testcases.  ETA later in the coming week. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50236563
  
    @javadba, sorry that maybe my previous comment is not clear.
    
    I'm opposed to use `synchronized` on `val monitor = shuffleId.toString.intern`. I see you have to use `intern` to get the same monitor for the same `shuffleId`. However, such usage will be a trap and surprise the developers. For example, if we allow to use `synchronized` on a `String.intern`, the following irrelevant codes may use the same monitor `shuffleId.toString.intern`:
    
    ```scala
    def foo(bar: Int) {
        bar.toString.intern.synchronized {
             ...
        }
    }
    ```
    This will definitely be hard to debug and track.
     
    What's more, I agree with @JoshRosen and also want to see an real test on these codes to demonstrate the improvement of your modification. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1542


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50511022
  
    I dunno, merging a PR with no changed files doesn't sound too scary to me.
    
    Something is definitely messed up in this PR, with both `Commits` and `Files changed` showing invalid results. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by javadba <gi...@git.apache.org>.
Github user javadba commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50170143
  
    Hi, thanks for taking the time to review this PR again.  A couple of things maybe I should have pointed out: 
    
    a. The strategy here is not to use the container collection "fetching" to enforce the proper concurrency. Instead the monitor is on the contained object shuffleId.
    b.  To ensure that changes to "fetching" itself do not cause deadlock/race conditions, the fetching itself was changed to a ConcurrentSkipList. 
    
    >> In addition, using the monitor of an object from outside to synchronized makes it hard to analyse the codes because the monitor of this object may be used (or used in the future) in other place and the developer may not notice somebody has already used it here.
    
    This has been addressed in my latest fix. There is no place that the intern'ed monitor is not used within the synchronized block.
    
    >> The read/write operation of fetching should use the same monitor according to the java memory model. Here for different shuffleId, you use different monitor. That's wrong.
    This is not true after the latest fix - as described above in my (a), (b).
    
    Thanks again for your help in reviewing this proposed improvement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1542#discussion_r15283935
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -130,7 +130,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
         if (statuses == null) {
           logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
           var fetchedStatuses: Array[MapStatus] = null
    -      fetching.synchronized {
    +      shuffleId.toString.intern.synchronized {
    --- End diff --
    
    This is not a correct fix for the following reasons:
    * Should always use the same lock to protect the access of `fetching` to set up the happen-before relation.
    * `fetching.wait()` should always be used under the protection of the `fetching` lock. See javadocs: http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#wait()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50509704
  
    That was super scarey ! Thanks for clarifying @aarondav 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50513840
  
    It looks like this pull request was opened from @javadba's master branch.  My hunch is that he force-pushed or otherwise reset that branch to bring it in sync with the ASF master repository, and when the git mirroring script caught up it noticed that a commit in this PR had been merged into the ASF master but was missing from GitHub, so it closed this PR.
    
    ("... and that's why you always create a feature branch").


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---